Introduce abstract security transport testcase (#33878)

This commit introduces an AbstractSimpleSecurityTransportTestCase for
security transports. This classes provides transport tests that are
specific for security transports. Additionally, it fixes the tests referenced in
#33285.
This commit is contained in:
Tim Brooks 2018-09-24 09:44:44 -06:00 committed by GitHub
parent df333ca305
commit 78e483e8d8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 203 additions and 311 deletions

View File

@ -58,7 +58,7 @@ public class SimpleNetty4TransportTests extends AbstractSimpleTransportTestCase
BigArrays.NON_RECYCLING_INSTANCE, namedWriteableRegistry, new NoneCircuitBreakerService()) { BigArrays.NON_RECYCLING_INSTANCE, namedWriteableRegistry, new NoneCircuitBreakerService()) {
@Override @Override
protected Version executeHandshake(DiscoveryNode node, TcpChannel channel, TimeValue timeout) throws IOException, public Version executeHandshake(DiscoveryNode node, TcpChannel channel, TimeValue timeout) throws IOException,
InterruptedException { InterruptedException {
if (doHandshake) { if (doHandshake) {
return super.executeHandshake(node, channel, timeout); return super.executeHandshake(node, channel, timeout);

View File

@ -62,7 +62,7 @@ public class SimpleNioTransportTests extends AbstractSimpleTransportTestCase {
new NoneCircuitBreakerService()) { new NoneCircuitBreakerService()) {
@Override @Override
protected Version executeHandshake(DiscoveryNode node, TcpChannel channel, TimeValue timeout) throws IOException, public Version executeHandshake(DiscoveryNode node, TcpChannel channel, TimeValue timeout) throws IOException,
InterruptedException { InterruptedException {
if (doHandshake) { if (doHandshake) {
return super.executeHandshake(node, channel, timeout); return super.executeHandshake(node, channel, timeout);

View File

@ -324,7 +324,7 @@ public class ConnectionManager implements Closeable {
} }
} }
static ConnectionProfile buildDefaultConnectionProfile(Settings settings) { public static ConnectionProfile buildDefaultConnectionProfile(Settings settings) {
int connectionsPerNodeRecovery = TransportService.CONNECTIONS_PER_NODE_RECOVERY.get(settings); int connectionsPerNodeRecovery = TransportService.CONNECTIONS_PER_NODE_RECOVERY.get(settings);
int connectionsPerNodeBulk = TransportService.CONNECTIONS_PER_NODE_BULK.get(settings); int connectionsPerNodeBulk = TransportService.CONNECTIONS_PER_NODE_BULK.get(settings);
int connectionsPerNodeReg = TransportService.CONNECTIONS_PER_NODE_REG.get(settings); int connectionsPerNodeReg = TransportService.CONNECTIONS_PER_NODE_REG.get(settings);

View File

@ -1492,7 +1492,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
} }
} }
protected Version executeHandshake(DiscoveryNode node, TcpChannel channel, TimeValue timeout) public Version executeHandshake(DiscoveryNode node, TcpChannel channel, TimeValue timeout)
throws IOException, InterruptedException { throws IOException, InterruptedException {
numHandshakes.inc(); numHandshakes.inc();
final long requestId = responseHandlers.newRequestId(); final long requestId = responseHandlers.newRequestId();

View File

@ -173,7 +173,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
return service; return service;
} }
private MockTransportService buildService(final String name, final Version version, ClusterSettings clusterSettings) { protected MockTransportService buildService(final String name, final Version version, ClusterSettings clusterSettings) {
return buildService(name, version, clusterSettings, Settings.EMPTY, true, true); return buildService(name, version, clusterSettings, Settings.EMPTY, true, true);
} }

View File

@ -40,7 +40,7 @@ public class MockTcpTransportTests extends AbstractSimpleTransportTestCase {
Transport transport = new MockTcpTransport(settings, threadPool, BigArrays.NON_RECYCLING_INSTANCE, Transport transport = new MockTcpTransport(settings, threadPool, BigArrays.NON_RECYCLING_INSTANCE,
new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(Collections.emptyList()), version) { new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(Collections.emptyList()), version) {
@Override @Override
protected Version executeHandshake(DiscoveryNode node, TcpChannel mockChannel, TimeValue timeout) throws IOException, public Version executeHandshake(DiscoveryNode node, TcpChannel mockChannel, TimeValue timeout) throws IOException,
InterruptedException { InterruptedException {
if (doHandshake) { if (doHandshake) {
return super.executeHandshake(node, mockChannel, timeout); return super.executeHandshake(node, mockChannel, timeout);

View File

@ -62,7 +62,7 @@ public class SimpleMockNioTransportTests extends AbstractSimpleTransportTestCase
new NoneCircuitBreakerService()) { new NoneCircuitBreakerService()) {
@Override @Override
protected Version executeHandshake(DiscoveryNode node, TcpChannel channel, TimeValue timeout) throws IOException, public Version executeHandshake(DiscoveryNode node, TcpChannel channel, TimeValue timeout) throws IOException,
InterruptedException { InterruptedException {
if (doHandshake) { if (doHandshake) {
return super.executeHandshake(node, channel, timeout); return super.executeHandshake(node, channel, timeout);

View File

@ -0,0 +1,179 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.security.transport;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.MockSecureSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.env.TestEnvironment;
import org.elasticsearch.node.Node;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.AbstractSimpleTransportTestCase;
import org.elasticsearch.transport.BindTransportException;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.ConnectionManager;
import org.elasticsearch.transport.ConnectionProfile;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.common.socket.SocketAccess;
import org.elasticsearch.xpack.core.ssl.SSLConfiguration;
import org.elasticsearch.xpack.core.ssl.SSLService;
import javax.net.SocketFactory;
import javax.net.ssl.HandshakeCompletedListener;
import javax.net.ssl.SSLSocket;
import java.io.IOException;
import java.net.InetAddress;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.nio.file.Path;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
public abstract class AbstractSimpleSecurityTransportTestCase extends AbstractSimpleTransportTestCase {
protected SSLService createSSLService() {
return createSSLService(Settings.EMPTY);
}
protected SSLService createSSLService(Settings settings) {
Path testnodeCert = getDataPath("/org/elasticsearch/xpack/security/transport/ssl/certs/simple/testnode.crt");
Path testnodeKey = getDataPath("/org/elasticsearch/xpack/security/transport/ssl/certs/simple/testnode.pem");
MockSecureSettings secureSettings = new MockSecureSettings();
secureSettings.setString("xpack.ssl.secure_key_passphrase", "testnode");
Settings settings1 = Settings.builder()
.put(settings)
.put("xpack.security.transport.ssl.enabled", true)
.put("xpack.ssl.key", testnodeKey)
.put("xpack.ssl.certificate", testnodeCert)
.put("path.home", createTempDir())
.setSecureSettings(secureSettings)
.build();
try {
return new SSLService(settings1, TestEnvironment.newEnvironment(settings1));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public void testConnectException() throws UnknownHostException {
try {
serviceA.connectToNode(new DiscoveryNode("C", new TransportAddress(InetAddress.getByName("localhost"), 9876),
emptyMap(), emptySet(), Version.CURRENT));
fail("Expected ConnectTransportException");
} catch (ConnectTransportException e) {
assertThat(e.getMessage(), containsString("connect_exception"));
assertThat(e.getMessage(), containsString("[127.0.0.1:9876]"));
Throwable cause = e.getCause();
assertThat(cause, instanceOf(IOException.class));
}
}
public void testBindUnavailableAddress() {
// this is on a lower level since it needs access to the TransportService before it's started
int port = serviceA.boundAddress().publishAddress().getPort();
Settings settings = Settings.builder()
.put(Node.NODE_NAME_SETTING.getKey(), "foobar")
.put(TransportService.TRACE_LOG_INCLUDE_SETTING.getKey(), "")
.put(TransportService.TRACE_LOG_EXCLUDE_SETTING.getKey(), "NOTHING")
.put("transport.tcp.port", port)
.build();
ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
BindTransportException bindTransportException = expectThrows(BindTransportException.class, () -> {
MockTransportService transportService = build(settings, Version.CURRENT, clusterSettings, true);
try {
transportService.start();
} finally {
transportService.stop();
transportService.close();
}
});
assertEquals("Failed to bind to [" + port + "]", bindTransportException.getMessage());
}
@Override
public void testTcpHandshake() throws IOException, InterruptedException {
assumeTrue("only tcp transport has a handshake method", serviceA.getOriginalTransport() instanceof TcpTransport);
TcpTransport originalTransport = (TcpTransport) serviceA.getOriginalTransport();
ConnectionProfile connectionProfile = ConnectionManager.buildDefaultConnectionProfile(Settings.EMPTY);
try (TransportService service = buildService("TS_TPC", Version.CURRENT, null);
TcpTransport.NodeChannels connection = originalTransport.openConnection(
new DiscoveryNode("TS_TPC", "TS_TPC", service.boundAddress().publishAddress(), emptyMap(), emptySet(), version0),
connectionProfile)) {
Version version = originalTransport.executeHandshake(connection.getNode(),
connection.channel(TransportRequestOptions.Type.PING), TimeValue.timeValueSeconds(10));
assertEquals(version, Version.CURRENT);
}
}
@SuppressForbidden(reason = "Need to open socket connection")
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/33772")
public void testRenegotiation() throws Exception {
SSLService sslService = createSSLService();
final SSLConfiguration sslConfiguration = sslService.getSSLConfiguration("xpack.ssl");
SocketFactory factory = sslService.sslSocketFactory(sslConfiguration);
try (SSLSocket socket = (SSLSocket) factory.createSocket()) {
SocketAccess.doPrivileged(() -> socket.connect(serviceA.boundAddress().publishAddress().address()));
CountDownLatch handshakeLatch = new CountDownLatch(1);
HandshakeCompletedListener firstListener = event -> handshakeLatch.countDown();
socket.addHandshakeCompletedListener(firstListener);
socket.startHandshake();
handshakeLatch.await();
socket.removeHandshakeCompletedListener(firstListener);
OutputStreamStreamOutput stream = new OutputStreamStreamOutput(socket.getOutputStream());
stream.writeByte((byte) 'E');
stream.writeByte((byte) 'S');
stream.writeInt(-1);
stream.flush();
CountDownLatch renegotiationLatch = new CountDownLatch(1);
HandshakeCompletedListener secondListener = event -> renegotiationLatch.countDown();
socket.addHandshakeCompletedListener(secondListener);
socket.startHandshake();
AtomicReference<Exception> error = new AtomicReference<>();
CountDownLatch catchReadErrorsLatch = new CountDownLatch(1);
Thread renegotiationThread = new Thread(() -> {
try {
socket.setSoTimeout(50);
socket.getInputStream().read();
} catch (SocketTimeoutException e) {
// Ignore. We expect a timeout.
} catch (IOException e) {
error.set(e);
} finally {
catchReadErrorsLatch.countDown();
}
});
renegotiationThread.start();
renegotiationLatch.await();
socket.removeHandshakeCompletedListener(secondListener);
catchReadErrorsLatch.await();
assertNull(error.get());
stream.writeByte((byte) 'E');
stream.writeByte((byte) 'S');
stream.writeInt(-1);
stream.flush();
}
}
}

View File

@ -14,23 +14,16 @@ import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.ssl.SslHandler; import io.netty.handler.ssl.SslHandler;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.MockSecureSettings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.env.TestEnvironment;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.node.Node;
import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.AbstractSimpleTransportTestCase;
import org.elasticsearch.transport.BindTransportException;
import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.ConnectionProfile; import org.elasticsearch.transport.ConnectionProfile;
import org.elasticsearch.transport.TcpChannel; import org.elasticsearch.transport.TcpChannel;
@ -38,39 +31,26 @@ import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.common.socket.SocketAccess;
import org.elasticsearch.xpack.core.security.transport.netty4.SecurityNetty4Transport;
import org.elasticsearch.xpack.core.ssl.SSLConfiguration;
import org.elasticsearch.xpack.core.ssl.SSLService; import org.elasticsearch.xpack.core.ssl.SSLService;
import org.elasticsearch.xpack.security.transport.AbstractSimpleSecurityTransportTestCase;
import javax.net.SocketFactory;
import javax.net.ssl.HandshakeCompletedListener;
import javax.net.ssl.SNIHostName; import javax.net.ssl.SNIHostName;
import javax.net.ssl.SNIMatcher; import javax.net.ssl.SNIMatcher;
import javax.net.ssl.SNIServerName; import javax.net.ssl.SNIServerName;
import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLParameters; import javax.net.ssl.SSLParameters;
import javax.net.ssl.SSLSocket;
import java.io.IOException; import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.nio.file.Path;
import java.util.Collections; import java.util.Collections;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashMap; import java.util.HashMap;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.elasticsearch.xpack.core.security.SecurityField.setting; import static org.elasticsearch.xpack.core.security.SecurityField.setting;
import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
public class SimpleSecurityNetty4TransportTests extends AbstractSimpleTransportTestCase { public class SimpleSecurityNetty4ServerTransportTests extends AbstractSimpleSecurityTransportTestCase {
private static final ConnectionProfile SINGLE_CHANNEL_PROFILE; private static final ConnectionProfile SINGLE_CHANNEL_PROFILE;
@ -85,25 +65,6 @@ public class SimpleSecurityNetty4TransportTests extends AbstractSimpleTransportT
SINGLE_CHANNEL_PROFILE = builder.build(); SINGLE_CHANNEL_PROFILE = builder.build();
} }
private SSLService createSSLService() {
Path testnodeCert = getDataPath("/org/elasticsearch/xpack/security/transport/ssl/certs/simple/testnode.crt");
Path testnodeKey = getDataPath("/org/elasticsearch/xpack/security/transport/ssl/certs/simple/testnode.pem");
MockSecureSettings secureSettings = new MockSecureSettings();
secureSettings.setString("xpack.ssl.secure_key_passphrase", "testnode");
Settings settings = Settings.builder()
.put("xpack.security.transport.ssl.enabled", true)
.put("xpack.ssl.key", testnodeKey)
.put("xpack.ssl.certificate", testnodeCert)
.put("path.home", createTempDir())
.setSecureSettings(secureSettings)
.build();
try {
return new SSLService(settings, TestEnvironment.newEnvironment(settings));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public MockTransportService nettyFromThreadPool(Settings settings, ThreadPool threadPool, final Version version, public MockTransportService nettyFromThreadPool(Settings settings, ThreadPool threadPool, final Version version,
ClusterSettings clusterSettings, boolean doHandshake) { ClusterSettings clusterSettings, boolean doHandshake) {
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList()); NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList());
@ -111,12 +72,12 @@ public class SimpleSecurityNetty4TransportTests extends AbstractSimpleTransportT
Settings settings1 = Settings.builder() Settings settings1 = Settings.builder()
.put(settings) .put(settings)
.put("xpack.security.transport.ssl.enabled", true).build(); .put("xpack.security.transport.ssl.enabled", true).build();
Transport transport = new SecurityNetty4Transport(settings1, threadPool, Transport transport = new SecurityNetty4ServerTransport(settings1, threadPool,
networkService, BigArrays.NON_RECYCLING_INSTANCE, namedWriteableRegistry, networkService, BigArrays.NON_RECYCLING_INSTANCE, namedWriteableRegistry,
new NoneCircuitBreakerService(), createSSLService()) { new NoneCircuitBreakerService(), null, createSSLService(settings1)) {
@Override @Override
protected Version executeHandshake(DiscoveryNode node, TcpChannel channel, TimeValue timeout) throws IOException, public Version executeHandshake(DiscoveryNode node, TcpChannel channel, TimeValue timeout) throws IOException,
InterruptedException { InterruptedException {
if (doHandshake) { if (doHandshake) {
return super.executeHandshake(node, channel, timeout); return super.executeHandshake(node, channel, timeout);
@ -140,118 +101,16 @@ public class SimpleSecurityNetty4TransportTests extends AbstractSimpleTransportT
@Override @Override
protected MockTransportService build(Settings settings, Version version, ClusterSettings clusterSettings, boolean doHandshake) { protected MockTransportService build(Settings settings, Version version, ClusterSettings clusterSettings, boolean doHandshake) {
settings = Settings.builder().put(settings) if (TcpTransport.PORT.exists(settings) == false) {
.put(TcpTransport.PORT.getKey(), "0") settings = Settings.builder().put(settings)
.build(); .put(TcpTransport.PORT.getKey(), "0")
.build();
}
MockTransportService transportService = nettyFromThreadPool(settings, threadPool, version, clusterSettings, doHandshake); MockTransportService transportService = nettyFromThreadPool(settings, threadPool, version, clusterSettings, doHandshake);
transportService.start(); transportService.start();
return transportService; return transportService;
} }
public void testConnectException() throws UnknownHostException {
try {
serviceA.connectToNode(new DiscoveryNode("C", new TransportAddress(InetAddress.getByName("localhost"), 9876),
emptyMap(), emptySet(), Version.CURRENT));
fail("Expected ConnectTransportException");
} catch (ConnectTransportException e) {
assertThat(e.getMessage(), containsString("connect_exception"));
assertThat(e.getMessage(), containsString("[127.0.0.1:9876]"));
Throwable cause = e.getCause();
assertThat(cause, instanceOf(IOException.class));
}
}
public void testBindUnavailableAddress() {
// this is on a lower level since it needs access to the TransportService before it's started
int port = serviceA.boundAddress().publishAddress().getPort();
Settings settings = Settings.builder()
.put(Node.NODE_NAME_SETTING.getKey(), "foobar")
.put(TransportService.TRACE_LOG_INCLUDE_SETTING.getKey(), "")
.put(TransportService.TRACE_LOG_EXCLUDE_SETTING.getKey(), "NOTHING")
.put("transport.tcp.port", port)
.build();
ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
BindTransportException bindTransportException = expectThrows(BindTransportException.class, () -> {
MockTransportService transportService = nettyFromThreadPool(settings, threadPool, Version.CURRENT, clusterSettings, true);
try {
transportService.start();
} finally {
transportService.stop();
transportService.close();
}
});
assertEquals("Failed to bind to [" + port + "]", bindTransportException.getMessage());
}
@SuppressForbidden(reason = "Need to open socket connection")
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/33772")
public void testRenegotiation() throws Exception {
SSLService sslService = createSSLService();
final SSLConfiguration sslConfiguration = sslService.getSSLConfiguration("xpack.ssl");
SocketFactory factory = sslService.sslSocketFactory(sslConfiguration);
try (SSLSocket socket = (SSLSocket) factory.createSocket()) {
SocketAccess.doPrivileged(() -> socket.connect(serviceA.boundAddress().publishAddress().address()));
CountDownLatch handshakeLatch = new CountDownLatch(1);
HandshakeCompletedListener firstListener = event -> handshakeLatch.countDown();
socket.addHandshakeCompletedListener(firstListener);
socket.startHandshake();
handshakeLatch.await();
socket.removeHandshakeCompletedListener(firstListener);
OutputStreamStreamOutput stream = new OutputStreamStreamOutput(socket.getOutputStream());
stream.writeByte((byte) 'E');
stream.writeByte((byte) 'S');
stream.writeInt(-1);
stream.flush();
socket.startHandshake();
CountDownLatch renegotiationLatch = new CountDownLatch(1);
HandshakeCompletedListener secondListener = event -> renegotiationLatch.countDown();
socket.addHandshakeCompletedListener(secondListener);
AtomicReference<Exception> error = new AtomicReference<>();
CountDownLatch catchReadErrorsLatch = new CountDownLatch(1);
Thread renegotiationThread = new Thread(() -> {
try {
socket.setSoTimeout(50);
socket.getInputStream().read();
} catch (SocketTimeoutException e) {
// Ignore. We expect a timeout.
} catch (IOException e) {
error.set(e);
} finally {
catchReadErrorsLatch.countDown();
}
});
renegotiationThread.start();
renegotiationLatch.await();
socket.removeHandshakeCompletedListener(secondListener);
catchReadErrorsLatch.await();
assertNull(error.get());
stream.writeByte((byte) 'E');
stream.writeByte((byte) 'S');
stream.writeInt(-1);
stream.flush();
}
}
// TODO: These tests currently rely on plaintext transports
@Override
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/33285")
public void testTcpHandshake() {
}
// TODO: These tests as configured do not currently work with the security transport
@Override
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/33285")
public void testTransportProfilesWithPortAndHost() {
}
public void testSNIServerNameIsPropagated() throws Exception { public void testSNIServerNameIsPropagated() throws Exception {
SSLService sslService = createSSLService(); SSLService sslService = createSSLService();
final ServerBootstrap serverBootstrap = new ServerBootstrap(); final ServerBootstrap serverBootstrap = new ServerBootstrap();

View File

@ -7,70 +7,25 @@ package org.elasticsearch.xpack.security.transport.nio;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.MockSecureSettings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.MockPageCacheRecycler; import org.elasticsearch.common.util.MockPageCacheRecycler;
import org.elasticsearch.env.TestEnvironment;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.node.Node;
import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.AbstractSimpleTransportTestCase;
import org.elasticsearch.transport.BindTransportException;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.TcpChannel; import org.elasticsearch.transport.TcpChannel;
import org.elasticsearch.transport.TcpTransport; import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.security.transport.AbstractSimpleSecurityTransportTestCase;
import org.elasticsearch.xpack.core.common.socket.SocketAccess;
import org.elasticsearch.xpack.core.ssl.SSLConfiguration;
import org.elasticsearch.xpack.core.ssl.SSLService;
import javax.net.SocketFactory;
import javax.net.ssl.HandshakeCompletedListener;
import javax.net.ssl.SSLSocket;
import java.io.IOException; import java.io.IOException;
import java.net.InetAddress;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.nio.file.Path;
import java.util.Collections; import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import static java.util.Collections.emptyMap; public class SimpleSecurityNioTransportTests extends AbstractSimpleSecurityTransportTestCase {
import static java.util.Collections.emptySet;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
public class SimpleSecurityNioTransportTests extends AbstractSimpleTransportTestCase {
private SSLService createSSLService() {
Path testnodeCert = getDataPath("/org/elasticsearch/xpack/security/transport/ssl/certs/simple/testnode.crt");
Path testnodeKey = getDataPath("/org/elasticsearch/xpack/security/transport/ssl/certs/simple/testnode.pem");
MockSecureSettings secureSettings = new MockSecureSettings();
secureSettings.setString("xpack.ssl.secure_key_passphrase", "testnode");
Settings settings = Settings.builder()
.put("xpack.security.transport.ssl.enabled", true)
.put("xpack.ssl.key", testnodeKey)
.put("xpack.ssl.certificate", testnodeCert)
.put("path.home", createTempDir())
.setSecureSettings(secureSettings)
.build();
try {
return new SSLService(settings, TestEnvironment.newEnvironment(settings));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public MockTransportService nioFromThreadPool(Settings settings, ThreadPool threadPool, final Version version, public MockTransportService nioFromThreadPool(Settings settings, ThreadPool threadPool, final Version version,
ClusterSettings clusterSettings, boolean doHandshake) { ClusterSettings clusterSettings, boolean doHandshake) {
@ -81,10 +36,10 @@ public class SimpleSecurityNioTransportTests extends AbstractSimpleTransportTest
.put("xpack.security.transport.ssl.enabled", true).build(); .put("xpack.security.transport.ssl.enabled", true).build();
Transport transport = new SecurityNioTransport(settings1, threadPool, Transport transport = new SecurityNioTransport(settings1, threadPool,
networkService, BigArrays.NON_RECYCLING_INSTANCE, new MockPageCacheRecycler(settings), namedWriteableRegistry, networkService, BigArrays.NON_RECYCLING_INSTANCE, new MockPageCacheRecycler(settings), namedWriteableRegistry,
new NoneCircuitBreakerService(), null, createSSLService()) { new NoneCircuitBreakerService(), null, createSSLService(settings1)) {
@Override @Override
protected Version executeHandshake(DiscoveryNode node, TcpChannel channel, TimeValue timeout) throws IOException, public Version executeHandshake(DiscoveryNode node, TcpChannel channel, TimeValue timeout) throws IOException,
InterruptedException { InterruptedException {
if (doHandshake) { if (doHandshake) {
return super.executeHandshake(node, channel, timeout); return super.executeHandshake(node, channel, timeout);
@ -108,114 +63,13 @@ public class SimpleSecurityNioTransportTests extends AbstractSimpleTransportTest
@Override @Override
protected MockTransportService build(Settings settings, Version version, ClusterSettings clusterSettings, boolean doHandshake) { protected MockTransportService build(Settings settings, Version version, ClusterSettings clusterSettings, boolean doHandshake) {
settings = Settings.builder().put(settings) if (TcpTransport.PORT.exists(settings) == false) {
settings = Settings.builder().put(settings)
.put(TcpTransport.PORT.getKey(), "0") .put(TcpTransport.PORT.getKey(), "0")
.build(); .build();
}
MockTransportService transportService = nioFromThreadPool(settings, threadPool, version, clusterSettings, doHandshake); MockTransportService transportService = nioFromThreadPool(settings, threadPool, version, clusterSettings, doHandshake);
transportService.start(); transportService.start();
return transportService; return transportService;
} }
public void testConnectException() throws UnknownHostException {
try {
serviceA.connectToNode(new DiscoveryNode("C", new TransportAddress(InetAddress.getByName("localhost"), 9876),
emptyMap(), emptySet(), Version.CURRENT));
fail("Expected ConnectTransportException");
} catch (ConnectTransportException e) {
assertThat(e.getMessage(), containsString("connect_exception"));
assertThat(e.getMessage(), containsString("[127.0.0.1:9876]"));
Throwable cause = e.getCause();
assertThat(cause, instanceOf(IOException.class));
}
}
public void testBindUnavailableAddress() {
// this is on a lower level since it needs access to the TransportService before it's started
int port = serviceA.boundAddress().publishAddress().getPort();
Settings settings = Settings.builder()
.put(Node.NODE_NAME_SETTING.getKey(), "foobar")
.put(TransportService.TRACE_LOG_INCLUDE_SETTING.getKey(), "")
.put(TransportService.TRACE_LOG_EXCLUDE_SETTING.getKey(), "NOTHING")
.put("transport.tcp.port", port)
.build();
ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
BindTransportException bindTransportException = expectThrows(BindTransportException.class, () -> {
MockTransportService transportService = nioFromThreadPool(settings, threadPool, Version.CURRENT, clusterSettings, true);
try {
transportService.start();
} finally {
transportService.stop();
transportService.close();
}
});
assertEquals("Failed to bind to [" + port + "]", bindTransportException.getMessage());
}
@SuppressForbidden(reason = "Need to open socket connection")
public void testRenegotiation() throws Exception {
SSLService sslService = createSSLService();
final SSLConfiguration sslConfiguration = sslService.getSSLConfiguration("xpack.ssl");
SocketFactory factory = sslService.sslSocketFactory(sslConfiguration);
try (SSLSocket socket = (SSLSocket) factory.createSocket()) {
SocketAccess.doPrivileged(() -> socket.connect(serviceA.boundAddress().publishAddress().address()));
CountDownLatch handshakeLatch = new CountDownLatch(1);
HandshakeCompletedListener firstListener = event -> handshakeLatch.countDown();
socket.addHandshakeCompletedListener(firstListener);
socket.startHandshake();
handshakeLatch.await();
socket.removeHandshakeCompletedListener(firstListener);
OutputStreamStreamOutput stream = new OutputStreamStreamOutput(socket.getOutputStream());
stream.writeByte((byte) 'E');
stream.writeByte((byte) 'S');
stream.writeInt(-1);
stream.flush();
socket.startHandshake();
CountDownLatch renegotiationLatch = new CountDownLatch(1);
HandshakeCompletedListener secondListener = event -> renegotiationLatch.countDown();
socket.addHandshakeCompletedListener(secondListener);
AtomicReference<Exception> error = new AtomicReference<>();
CountDownLatch catchReadErrorsLatch = new CountDownLatch(1);
Thread renegotiationThread = new Thread(() -> {
try {
socket.setSoTimeout(50);
socket.getInputStream().read();
} catch (SocketTimeoutException e) {
// Ignore. We expect a timeout.
} catch (IOException e) {
error.set(e);
} finally {
catchReadErrorsLatch.countDown();
}
});
renegotiationThread.start();
renegotiationLatch.await();
socket.removeHandshakeCompletedListener(secondListener);
catchReadErrorsLatch.await();
assertNull(error.get());
stream.writeByte((byte) 'E');
stream.writeByte((byte)'S');
stream.writeInt(-1);
stream.flush();
}
}
// TODO: These tests currently rely on plaintext transports
@Override
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/33285")
public void testTcpHandshake() throws IOException, InterruptedException {
}
// TODO: These tests as configured do not currently work with the security transport
@Override
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/33285")
public void testTransportProfilesWithPortAndHost() {
}
} }