Fix unit tests to bind to port 0.

I will followup with ITs and other modules. By fixing this, these tests become more reliable (will never sporatically
fail due to other stuff on your machine: ports are assigned by the OS), and it allows us to move forward with
gradle parallel builds, in my tests this is a nice speedup, but we can't do it until tests are cleaned up
This commit is contained in:
Robert Muir 2015-11-30 17:22:58 -05:00
parent cc627e41cc
commit 44f21b24d7
6 changed files with 52 additions and 140 deletions

View File

@ -91,7 +91,10 @@ public class NettyHttpServerPipeliningTests extends ESTestCase {
} }
public void testThatHttpPipeliningWorksWhenEnabled() throws Exception { public void testThatHttpPipeliningWorksWhenEnabled() throws Exception {
Settings settings = settingsBuilder().put("http.pipelining", true).build(); Settings settings = settingsBuilder()
.put("http.pipelining", true)
.put("http.port", "0")
.build();
httpServerTransport = new CustomNettyHttpServerTransport(settings); httpServerTransport = new CustomNettyHttpServerTransport(settings);
httpServerTransport.start(); httpServerTransport.start();
InetSocketTransportAddress transportAddress = (InetSocketTransportAddress) randomFrom(httpServerTransport.boundAddress().boundAddresses()); InetSocketTransportAddress transportAddress = (InetSocketTransportAddress) randomFrom(httpServerTransport.boundAddress().boundAddresses());
@ -105,7 +108,10 @@ public class NettyHttpServerPipeliningTests extends ESTestCase {
} }
public void testThatHttpPipeliningCanBeDisabled() throws Exception { public void testThatHttpPipeliningCanBeDisabled() throws Exception {
Settings settings = settingsBuilder().put("http.pipelining", false).build(); Settings settings = settingsBuilder()
.put("http.pipelining", false)
.put("http.port", "0")
.build();
httpServerTransport = new CustomNettyHttpServerTransport(settings); httpServerTransport = new CustomNettyHttpServerTransport(settings);
httpServerTransport.start(); httpServerTransport.start();
InetSocketTransportAddress transportAddress = (InetSocketTransportAddress) randomFrom(httpServerTransport.boundAddress().boundAddresses()); InetSocketTransportAddress transportAddress = (InetSocketTransportAddress) randomFrom(httpServerTransport.boundAddress().boundAddresses());

View File

@ -76,8 +76,6 @@ public class HttpPipeliningHandlerTests extends ESTestCase {
private static final long RESPONSE_TIMEOUT = 10000L; private static final long RESPONSE_TIMEOUT = 10000L;
private static final long CONNECTION_TIMEOUT = 10000L; private static final long CONNECTION_TIMEOUT = 10000L;
private static final String CONTENT_TYPE_TEXT = "text/plain; charset=UTF-8"; private static final String CONTENT_TYPE_TEXT = "text/plain; charset=UTF-8";
// TODO make me random
private static final InetSocketAddress HOST_ADDR = new InetSocketAddress(InetAddress.getLoopbackAddress(), 9080);
private static final String PATH1 = "/1"; private static final String PATH1 = "/1";
private static final String PATH2 = "/2"; private static final String PATH2 = "/2";
private static final String SOME_RESPONSE_TEXT = "some response for "; private static final String SOME_RESPONSE_TEXT = "some response for ";
@ -90,6 +88,8 @@ public class HttpPipeliningHandlerTests extends ESTestCase {
private HashedWheelTimer timer; private HashedWheelTimer timer;
private InetSocketAddress boundAddress;
@Before @Before
public void startBootstraps() { public void startBootstraps() {
clientBootstrap = new ClientBootstrap(new NioClientSocketChannelFactory()); clientBootstrap = new ClientBootstrap(new NioClientSocketChannelFactory());
@ -118,7 +118,8 @@ public class HttpPipeliningHandlerTests extends ESTestCase {
} }
}); });
serverBootstrap.bind(HOST_ADDR); Channel channel = serverBootstrap.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
boundAddress = (InetSocketAddress) channel.getLocalAddress();
timer = new HashedWheelTimer(); timer = new HashedWheelTimer();
} }
@ -137,7 +138,7 @@ public class HttpPipeliningHandlerTests extends ESTestCase {
responsesIn = new CountDownLatch(1); responsesIn = new CountDownLatch(1);
responses.clear(); responses.clear();
final ChannelFuture connectionFuture = clientBootstrap.connect(HOST_ADDR); final ChannelFuture connectionFuture = clientBootstrap.connect(boundAddress);
assertTrue(connectionFuture.await(CONNECTION_TIMEOUT)); assertTrue(connectionFuture.await(CONNECTION_TIMEOUT));
final Channel clientChannel = connectionFuture.getChannel(); final Channel clientChannel = connectionFuture.getChannel();
@ -145,11 +146,11 @@ public class HttpPipeliningHandlerTests extends ESTestCase {
// NetworkAddress.formatAddress makes a proper HOST header. // NetworkAddress.formatAddress makes a proper HOST header.
final HttpRequest request1 = new DefaultHttpRequest( final HttpRequest request1 = new DefaultHttpRequest(
HTTP_1_1, HttpMethod.GET, PATH1); HTTP_1_1, HttpMethod.GET, PATH1);
request1.headers().add(HOST, NetworkAddress.formatAddress(HOST_ADDR)); request1.headers().add(HOST, NetworkAddress.formatAddress(boundAddress));
final HttpRequest request2 = new DefaultHttpRequest( final HttpRequest request2 = new DefaultHttpRequest(
HTTP_1_1, HttpMethod.GET, PATH2); HTTP_1_1, HttpMethod.GET, PATH2);
request2.headers().add(HOST, NetworkAddress.formatAddress(HOST_ADDR)); request2.headers().add(HOST, NetworkAddress.formatAddress(boundAddress));
clientChannel.write(request1); clientChannel.write(request1);
clientChannel.write(request2); clientChannel.write(request2);

View File

@ -50,7 +50,11 @@ import static org.hamcrest.Matchers.is;
*/ */
public class NettySizeHeaderFrameDecoderTests extends ESTestCase { public class NettySizeHeaderFrameDecoderTests extends ESTestCase {
private final Settings settings = settingsBuilder().put("name", "foo").put("transport.host", "127.0.0.1").build(); private final Settings settings = settingsBuilder()
.put("name", "foo")
.put("transport.host", "127.0.0.1")
.put("transport.tcp.port", "0")
.build();
private ThreadPool threadPool; private ThreadPool threadPool;
private NettyTransport nettyTransport; private NettyTransport nettyTransport;

View File

@ -49,9 +49,7 @@ public class NettyScheduledPingTests extends ESTestCase {
public void testScheduledPing() throws Exception { public void testScheduledPing() throws Exception {
ThreadPool threadPool = new ThreadPool(getClass().getName()); ThreadPool threadPool = new ThreadPool(getClass().getName());
int startPort = 11000 + randomIntBetween(0, 255); Settings settings = Settings.builder().put(NettyTransport.PING_SCHEDULE, "5ms").put("transport.tcp.port", 0).build();
int endPort = startPort + 10;
Settings settings = Settings.builder().put(NettyTransport.PING_SCHEDULE, "5ms").put("transport.tcp.port", startPort + "-" + endPort).build();
final NettyTransport nettyA = new NettyTransport(settings, threadPool, new NetworkService(settings), BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, new NamedWriteableRegistry()); final NettyTransport nettyA = new NettyTransport(settings, threadPool, new NetworkService(settings), BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, new NamedWriteableRegistry());
MockTransportService serviceA = new MockTransportService(settings, nettyA, threadPool); MockTransportService serviceA = new MockTransportService(settings, nettyA, threadPool);

View File

@ -18,8 +18,6 @@
*/ */
package org.elasticsearch.transport.netty; package org.elasticsearch.transport.netty;
import com.carrotsearch.hppc.IntHashSet;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.cache.recycler.PageCacheRecycler; import org.elasticsearch.cache.recycler.PageCacheRecycler;
import org.elasticsearch.common.component.Lifecycle; import org.elasticsearch.common.component.Lifecycle;
@ -27,176 +25,115 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.network.NetworkUtils; import org.elasticsearch.common.network.NetworkUtils;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.MockBigArrays; import org.elasticsearch.common.util.MockBigArrays;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.junit.rule.RepeatOnExceptionRule;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BindTransportException;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import org.junit.Before; import org.junit.Before;
import org.junit.Rule;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import static org.elasticsearch.common.settings.Settings.settingsBuilder; import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
public class NettyTransportMultiPortTests extends ESTestCase { public class NettyTransportMultiPortTests extends ESTestCase {
private static final int MAX_RETRIES = 10;
private String host; private String host;
@Rule
public RepeatOnExceptionRule repeatOnBindExceptionRule = new RepeatOnExceptionRule(logger, MAX_RETRIES, BindTransportException.class);
@Before @Before
public void setup() { public void setup() {
if (randomBoolean()) {
host = "localhost";
} else {
if (NetworkUtils.SUPPORTS_V6 && randomBoolean()) { if (NetworkUtils.SUPPORTS_V6 && randomBoolean()) {
host = "::1"; host = "::1";
} else { } else {
host = "127.0.0.1"; host = "127.0.0.1";
} }
} }
}
public void testThatNettyCanBindToMultiplePorts() throws Exception { public void testThatNettyCanBindToMultiplePorts() throws Exception {
int[] ports = getRandomPorts(3);
Settings settings = settingsBuilder() Settings settings = settingsBuilder()
.put("network.host", host) .put("network.host", host)
.put("transport.tcp.port", ports[0]) .put("transport.tcp.port", 22) // will not actually bind to this
.put("transport.profiles.default.port", ports[1]) .put("transport.profiles.default.port", 0)
.put("transport.profiles.client1.port", ports[2]) .put("transport.profiles.client1.port", 0)
.build(); .build();
ThreadPool threadPool = new ThreadPool("tst"); ThreadPool threadPool = new ThreadPool("tst");
try (NettyTransport ignored = startNettyTransport(settings, threadPool)) { try (NettyTransport transport = startNettyTransport(settings, threadPool)) {
assertConnectionRefused(ports[0]); assertEquals(1, transport.profileBoundAddresses().size());
assertPortIsBound(ports[1]); assertEquals(1, transport.boundAddress().boundAddresses().length);
assertPortIsBound(ports[2]);
} finally { } finally {
terminate(threadPool); terminate(threadPool);
} }
} }
public void testThatDefaultProfileInheritsFromStandardSettings() throws Exception { public void testThatDefaultProfileInheritsFromStandardSettings() throws Exception {
int[] ports = getRandomPorts(2);
Settings settings = settingsBuilder() Settings settings = settingsBuilder()
.put("network.host", host) .put("network.host", host)
.put("transport.tcp.port", ports[0]) .put("transport.tcp.port", 0)
.put("transport.profiles.client1.port", ports[1]) .put("transport.profiles.client1.port", 0)
.build(); .build();
ThreadPool threadPool = new ThreadPool("tst"); ThreadPool threadPool = new ThreadPool("tst");
try (NettyTransport ignored = startNettyTransport(settings, threadPool)) { try (NettyTransport transport = startNettyTransport(settings, threadPool)) {
assertPortIsBound(ports[0]); assertEquals(1, transport.profileBoundAddresses().size());
assertPortIsBound(ports[1]); assertEquals(1, transport.boundAddress().boundAddresses().length);
} finally { } finally {
terminate(threadPool); terminate(threadPool);
} }
} }
public void testThatProfileWithoutPortSettingsFails() throws Exception { public void testThatProfileWithoutPortSettingsFails() throws Exception {
int[] ports = getRandomPorts(1);
Settings settings = settingsBuilder() Settings settings = settingsBuilder()
.put("network.host", host) .put("network.host", host)
.put("transport.tcp.port", ports[0]) .put("transport.tcp.port", 0)
.put("transport.profiles.client1.whatever", "foo") .put("transport.profiles.client1.whatever", "foo")
.build(); .build();
ThreadPool threadPool = new ThreadPool("tst"); ThreadPool threadPool = new ThreadPool("tst");
try (NettyTransport ignored = startNettyTransport(settings, threadPool)) { try (NettyTransport transport = startNettyTransport(settings, threadPool)) {
assertPortIsBound(ports[0]); assertEquals(0, transport.profileBoundAddresses().size());
assertEquals(1, transport.boundAddress().boundAddresses().length);
} finally { } finally {
terminate(threadPool); terminate(threadPool);
} }
} }
public void testThatDefaultProfilePortOverridesGeneralConfiguration() throws Exception { public void testThatDefaultProfilePortOverridesGeneralConfiguration() throws Exception {
int[] ports = getRandomPorts(3);
Settings settings = settingsBuilder() Settings settings = settingsBuilder()
.put("network.host", host) .put("network.host", host)
.put("transport.tcp.port", ports[0]) .put("transport.tcp.port", 22) // will not actually bind to this
.put("transport.netty.port", ports[1]) .put("transport.netty.port", 23) // will not actually bind to this
.put("transport.profiles.default.port", ports[2]) .put("transport.profiles.default.port", 0)
.build(); .build();
ThreadPool threadPool = new ThreadPool("tst"); ThreadPool threadPool = new ThreadPool("tst");
try (NettyTransport ignored = startNettyTransport(settings, threadPool)) { try (NettyTransport transport = startNettyTransport(settings, threadPool)) {
assertConnectionRefused(ports[0]); assertEquals(0, transport.profileBoundAddresses().size());
assertConnectionRefused(ports[1]); assertEquals(1, transport.boundAddress().boundAddresses().length);
assertPortIsBound(ports[2]);
} finally { } finally {
terminate(threadPool); terminate(threadPool);
} }
} }
public void testThatProfileWithoutValidNameIsIgnored() throws Exception { public void testThatProfileWithoutValidNameIsIgnored() throws Exception {
int[] ports = getRandomPorts(3);
Settings settings = settingsBuilder() Settings settings = settingsBuilder()
.put("network.host", host) .put("network.host", host)
.put("transport.tcp.port", ports[0]) .put("transport.tcp.port", 0)
// mimics someone trying to define a profile for .local which is the profile for a node request to itself // mimics someone trying to define a profile for .local which is the profile for a node request to itself
.put("transport.profiles." + TransportService.DIRECT_RESPONSE_PROFILE + ".port", ports[1]) .put("transport.profiles." + TransportService.DIRECT_RESPONSE_PROFILE + ".port", 22) // will not actually bind to this
.put("transport.profiles..port", ports[2]) .put("transport.profiles..port", 23) // will not actually bind to this
.build(); .build();
ThreadPool threadPool = new ThreadPool("tst"); ThreadPool threadPool = new ThreadPool("tst");
try (NettyTransport ignored = startNettyTransport(settings, threadPool)) { try (NettyTransport transport = startNettyTransport(settings, threadPool)) {
assertPortIsBound(ports[0]); assertEquals(0, transport.profileBoundAddresses().size());
assertConnectionRefused(ports[1]); assertEquals(1, transport.boundAddress().boundAddresses().length);
assertConnectionRefused(ports[2]);
} finally { } finally {
terminate(threadPool); terminate(threadPool);
} }
} }
private int[] getRandomPorts(int numberOfPorts) {
IntHashSet ports = new IntHashSet();
int nextPort = randomIntBetween(49152, 65535);
for (int i = 0; i < numberOfPorts; i++) {
boolean foundPortInRange = false;
while (!foundPortInRange) {
if (!ports.contains(nextPort)) {
logger.debug("looking to see if port [{}]is available", nextPort);
try (ServerSocket serverSocket = new ServerSocket()) {
// Set SO_REUSEADDR as we may bind here and not be able
// to reuse the address immediately without it.
serverSocket.setReuseAddress(NetworkUtils.defaultReuseAddress());
serverSocket.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), nextPort));
// bind was a success
logger.debug("port [{}] available.", nextPort);
foundPortInRange = true;
ports.add(nextPort);
} catch (IOException e) {
// Do nothing
logger.debug("port [{}] not available.", e, nextPort);
}
}
nextPort = randomIntBetween(49152, 65535);
}
}
return ports.toArray();
}
private NettyTransport startNettyTransport(Settings settings, ThreadPool threadPool) { private NettyTransport startNettyTransport(Settings settings, ThreadPool threadPool) {
BigArrays bigArrays = new MockBigArrays(new PageCacheRecycler(settings, threadPool), new NoneCircuitBreakerService()); BigArrays bigArrays = new MockBigArrays(new PageCacheRecycler(settings, threadPool), new NoneCircuitBreakerService());
@ -206,36 +143,4 @@ public class NettyTransportMultiPortTests extends ESTestCase {
assertThat(nettyTransport.lifecycleState(), is(Lifecycle.State.STARTED)); assertThat(nettyTransport.lifecycleState(), is(Lifecycle.State.STARTED));
return nettyTransport; return nettyTransport;
} }
private void assertConnectionRefused(int port) throws Exception {
try {
trySocketConnection(new InetSocketTransportAddress(InetAddress.getByName(host), port).address());
fail("Expected to get exception when connecting to port " + port);
} catch (IOException e) {
// expected
logger.info("Got expected connection message {}", e.getMessage());
}
}
private void assertPortIsBound(int port) throws Exception {
assertPortIsBound(host, port);
}
private void assertPortIsBound(String host, int port) throws Exception {
logger.info("Trying to connect to [{}]:[{}]", host, port);
trySocketConnection(new InetSocketTransportAddress(InetAddress.getByName(host), port).address());
}
private void trySocketConnection(InetSocketAddress address) throws Exception {
try (Socket socket = new Socket()) {
logger.info("Connecting to {}", address);
socket.connect(address, 500);
assertThat(socket.isConnected(), is(true));
try (OutputStream os = socket.getOutputStream()) {
os.write("foo".getBytes(StandardCharsets.UTF_8));
os.flush();
}
}
}
} }

View File

@ -38,9 +38,7 @@ import static org.hamcrest.Matchers.containsString;
public class SimpleNettyTransportTests extends AbstractSimpleTransportTestCase { public class SimpleNettyTransportTests extends AbstractSimpleTransportTestCase {
@Override @Override
protected MockTransportService build(Settings settings, Version version, NamedWriteableRegistry namedWriteableRegistry) { protected MockTransportService build(Settings settings, Version version, NamedWriteableRegistry namedWriteableRegistry) {
int startPort = 11000 + randomIntBetween(0, 255); settings = Settings.builder().put(settings).put("transport.tcp.port", "0").build();
int endPort = startPort + 10;
settings = Settings.builder().put(settings).put("transport.tcp.port", startPort + "-" + endPort).build();
MockTransportService transportService = new MockTransportService(settings, new NettyTransport(settings, threadPool, new NetworkService(settings), BigArrays.NON_RECYCLING_INSTANCE, version, namedWriteableRegistry), threadPool); MockTransportService transportService = new MockTransportService(settings, new NettyTransport(settings, threadPool, new NetworkService(settings), BigArrays.NON_RECYCLING_INSTANCE, version, namedWriteableRegistry), threadPool);
transportService.start(); transportService.start();
return transportService; return transportService;