Build: Change rest integ tests to not have hardcoded ports

This change removes hardcoded ports from cluster formation. It passes
port 0 for http and transport, and then uses a special property to have
the node log the ports used for http and transport (just for tests).
This does not yet work for multi node tests. This brings us one step
closer to working with --parallel.
This commit is contained in:
Ryan Ernst 2015-12-11 17:36:08 -08:00
parent a42823ad45
commit 5c8a0da1fd
10 changed files with 126 additions and 49 deletions

View File

@ -78,7 +78,7 @@ class RandomizedTestingTask extends DefaultTask {
@Input
String argLine = null
Map<String, String> systemProperties = new HashMap<>()
Map<String, Object> systemProperties = new HashMap<>()
PatternFilterable patternSet = new PatternSet()
RandomizedTestingTask() {
@ -100,7 +100,7 @@ class RandomizedTestingTask extends DefaultTask {
jvmArgs.add(argument)
}
void systemProperty(String property, String value) {
void systemProperty(String property, Object value) {
systemProperties.put(property, value)
}
@ -245,8 +245,8 @@ class RandomizedTestingTask extends DefaultTask {
exclude(name: excludePattern)
}
}
for (Map.Entry<String, String> prop : systemProperties) {
sysproperty key: prop.getKey(), value: prop.getValue()
for (Map.Entry<String, Object> prop : systemProperties) {
sysproperty key: prop.getKey(), value: prop.getValue().toString()
}
makeListeners()
}

View File

@ -33,10 +33,10 @@ class ClusterConfiguration {
int numNodes = 1
@Input
int baseHttpPort = 9400
int httpPort = 0
@Input
int baseTransportPort = 9500
int transportPort = 0
@Input
boolean daemonize = true
@ -55,7 +55,7 @@ class ClusterConfiguration {
@Input
Closure waitCondition = { NodeInfo node, AntBuilder ant ->
File tmpFile = new File(node.cwd, 'wait.success')
ant.get(src: "http://localhost:${node.httpPort()}",
ant.get(src: "http://${node.httpUri()}",
dest: tmpFile.toString(),
ignoreerrors: true, // do not fail on error, so logging buffers can be flushed by the wait task
retries: 10)

View File

@ -38,8 +38,10 @@ class ClusterFormationTasks {
/**
* Adds dependent tasks to the given task to start and stop a cluster with the given configuration.
*
* Returns an object that will resolve at execution time of the given task to a uri for the cluster.
*/
static void setup(Project project, Task task, ClusterConfiguration config) {
static Object setup(Project project, Task task, ClusterConfiguration config) {
if (task.getEnabled() == false) {
// no need to add cluster formation tasks if the task won't run!
return
@ -55,6 +57,9 @@ class ClusterFormationTasks {
Task wait = configureWaitTask("${task.name}#wait", project, nodes, startTasks)
task.dependsOn(wait)
// delay the resolution of the uri by wrapping in a closure, so it is not used until read for tests
return "${-> nodes[0].transportUri()}"
}
/** Adds a dependency on the given distribution */
@ -200,17 +205,24 @@ class ClusterFormationTasks {
/** Adds a task to write elasticsearch.yml for the given node configuration */
static Task configureWriteConfigTask(String name, Project project, Task setup, NodeInfo node) {
Map esConfig = [
'cluster.name' : node.clusterName,
'http.port' : node.httpPort(),
'transport.tcp.port' : node.transportPort(),
'pidfile' : node.pidFile,
'discovery.zen.ping.unicast.hosts': (0..<node.config.numNodes).collect{"127.0.0.1:${node.config.baseTransportPort + it}"}.join(','),
'path.repo' : "${node.homeDir}/repo",
'path.shared_data' : "${node.homeDir}/../",
// Define a node attribute so we can test that it exists
'node.testattr' : 'test',
'repositories.url.allowed_urls' : 'http://snapshot.test*'
'cluster.name' : node.clusterName,
'pidfile' : node.pidFile,
'path.repo' : "${node.homeDir}/repo",
'path.shared_data' : "${node.homeDir}/../",
// Define a node attribute so we can test that it exists
'node.testattr' : 'test',
'repositories.url.allowed_urls': 'http://snapshot.test*'
]
if (node.config.numNodes == 1) {
esConfig['http.port'] = node.config.httpPort
esConfig['transport.tcp.port'] = node.config.transportPort
} else {
// TODO: fix multi node so it doesn't use hardcoded prots
esConfig['http.port'] = 9400 + node.nodeNum
esConfig['transport.tcp.port'] = 9500 + node.nodeNum
esConfig['discovery.zen.ping.unicast.hosts'] = (0..<node.config.numNodes).collect{"localhost:${9500 + it}"}.join(',')
}
esConfig.putAll(node.config.settings)
Task writeConfig = project.tasks.create(name: name, type: DefaultTask, dependsOn: setup)
@ -400,7 +412,12 @@ class ClusterFormationTasks {
resourceexists {
file(file: node.pidFile.toString())
}
socket(server: '127.0.0.1', port: node.httpPort())
resourceexists {
file(file: node.httpPortsFile.toString())
}
resourceexists {
file(file: node.transportPortsFile.toString())
}
}
}
}
@ -444,6 +461,8 @@ class ClusterFormationTasks {
logger.error("|-----------------------------------------")
logger.error("| failure marker exists: ${node.failedMarker.exists()}")
logger.error("| pid file exists: ${node.pidFile.exists()}")
logger.error("| http ports file exists: ${node.httpPortsFile.exists()}")
logger.error("| transport ports file exists: ${node.transportPortsFile.exists()}")
// the waitfor failed, so dump any output we got (if info logging this goes directly to stdout)
logger.error("|\n| [ant output]")
node.buffer.toString('UTF-8').eachLine { line -> logger.error("| ${line}") }

View File

@ -43,6 +43,12 @@ class NodeInfo {
/** the pid file the node will use */
File pidFile
/** a file written by elasticsearch containing the ports of each bound address for http */
File httpPortsFile
/** a file written by elasticsearch containing the ports of each bound address for transport */
File transportPortsFile
/** elasticsearch home dir */
File homeDir
@ -92,6 +98,10 @@ class NodeInfo {
homeDir = homeDir(baseDir, config.distribution)
confDir = confDir(baseDir, config.distribution)
configFile = new File(confDir, 'elasticsearch.yml')
// even for rpm/deb, the logs are under home because we dont start with real services
File logsDir = new File(homeDir, 'logs')
httpPortsFile = new File(logsDir, 'http.ports')
transportPortsFile = new File(logsDir, 'transport.ports')
cwd = new File(baseDir, "cwd")
failedMarker = new File(cwd, 'run.failed')
startLog = new File(cwd, 'run.log')
@ -119,6 +129,7 @@ class NodeInfo {
'JAVA_HOME' : project.javaHome,
'ES_GC_OPTS': config.jvmArgs // we pass these with the undocumented gc opts so the argline can set gc, etc
]
args.add("-Des.tests.portsfile=true")
args.addAll(config.systemProperties.collect { key, value -> "-D${key}=${value}" })
for (Map.Entry<String, String> property : System.properties.entrySet()) {
if (property.getKey().startsWith('es.')) {
@ -159,14 +170,14 @@ class NodeInfo {
wrapperScript.setText("\"${esScript}\" ${argsPasser} > run.log 2>&1 ${exitMarker}", 'UTF-8')
}
/** Returns the http port for this node */
int httpPort() {
return config.baseHttpPort + nodeNum
/** Returns an address and port suitable for a uri to connect to this node over http */
String httpUri() {
return httpPortsFile.readLines("UTF-8").get(0)
}
/** Returns the transport port for this node */
int transportPort() {
return config.baseTransportPort + nodeNum
/** Returns an address and port suitable for a uri to connect to this node over transport protocol */
String transportUri() {
return transportPortsFile.readLines("UTF-8").get(0)
}
/** Returns the directory elasticsearch home is contained in for the given distribution */

View File

@ -57,12 +57,12 @@ public class RestIntegTestTask extends RandomizedTestingTask {
RestSpecHack.configureDependencies(project)
project.afterEvaluate {
dependsOn(RestSpecHack.configureTask(project, includePackaged))
systemProperty('tests.cluster', "localhost:${clusterConfig.baseTransportPort}")
}
// this must run after all projects have been configured, so we know any project
// references can be accessed as a fully configured
project.gradle.projectsEvaluated {
ClusterFormationTasks.setup(project, this, clusterConfig)
Object clusterUri = ClusterFormationTasks.setup(project, this, clusterConfig)
systemProperty('tests.cluster', clusterUri)
}
}

View File

@ -8,7 +8,7 @@ import org.gradle.util.ConfigureUtil
public class RunTask extends DefaultTask {
ClusterConfiguration clusterConfig = new ClusterConfiguration(baseHttpPort: 9200, baseTransportPort: 9300, daemonize: false)
ClusterConfiguration clusterConfig = new ClusterConfiguration(httpPort: 9200, transportPort: 9300, daemonize: false)
public RunTask() {
description = "Runs elasticsearch with '${project.path}'"

View File

@ -33,6 +33,7 @@ import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.inject.Injector;
@ -42,11 +43,14 @@ import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.settings.SettingsModule;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.discovery.DiscoveryService;
@ -59,6 +63,7 @@ import org.elasticsearch.gateway.GatewayModule;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.http.HttpServer;
import org.elasticsearch.http.HttpServerModule;
import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.indices.IndicesModule;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.analysis.AnalysisModule;
@ -97,7 +102,15 @@ import org.elasticsearch.tribe.TribeService;
import org.elasticsearch.watcher.ResourceWatcherModule;
import org.elasticsearch.watcher.ResourceWatcherService;
import java.io.BufferedWriter;
import java.io.IOException;
import java.net.Inet6Address;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import java.nio.file.CopyOption;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@ -274,6 +287,15 @@ public class Node implements Releasable {
injector.getInstance(ResourceWatcherService.class).start();
injector.getInstance(TribeService.class).start();
if (System.getProperty("es.tests.portsfile", "false").equals("true")) {
if (settings.getAsBoolean("http.enabled", true)) {
HttpServerTransport http = injector.getInstance(HttpServerTransport.class);
writePortsFile("http", http.boundAddress());
}
TransportService transport = injector.getInstance(TransportService.class);
writePortsFile("transport", transport.boundAddress());
}
logger.info("started");
return this;
@ -425,4 +447,28 @@ public class Node implements Releasable {
public Injector injector() {
return this.injector;
}
@SuppressForbidden(reason = "Need to specify port and pass to formatter")
private void writePortsFile(String type, BoundTransportAddress boundAddress) {
Path tmpPortsFile = environment.logsFile().resolve(type + ".ports.tmp");
try (BufferedWriter writer = Files.newBufferedWriter(tmpPortsFile, Charset.forName("UTF-8"))) {
for (TransportAddress address : boundAddress.boundAddresses()) {
InetSocketAddress socketAddress = new InetSocketAddress(address.getAddress(), address.getPort());
if (socketAddress.getAddress() instanceof Inet6Address &&
socketAddress.getAddress().isLinkLocalAddress()) {
// no link local, just causes problems
continue;
}
writer.write(NetworkAddress.formatAddress(socketAddress) + "\n");
}
} catch (IOException e) {
throw new RuntimeException("Failed to write ports file", e);
}
Path portsFile = environment.logsFile().resolve(type + ".ports");
try {
Files.move(tmpPortsFile, portsFile, StandardCopyOption.ATOMIC_MOVE);
} catch (IOException e) {
throw new RuntimeException("Failed to rename ports file", e);
}
}
}

View File

@ -87,6 +87,7 @@ import org.jboss.netty.channel.socket.oio.OioServerSocketChannelFactory;
import org.jboss.netty.util.HashedWheelTimer;
import java.io.IOException;
import java.net.BindException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
@ -763,6 +764,11 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
// close the channel as safe measure, which will cause a node to be disconnected if relevant
ctx.getChannel().close();
disconnectFromNodeChannel(ctx.getChannel(), e.getCause());
} else if (e.getCause() instanceof BindException) {
logger.trace("bind exception caught on transport layer [{}]", e.getCause(), ctx.getChannel());
// close the channel as safe measure, which will cause a node to be disconnected if relevant
ctx.getChannel().close();
disconnectFromNodeChannel(ctx.getChannel(), e.getCause());
} else if (e.getCause() instanceof CancelledKeyException) {
logger.trace("cancelled key exception caught on transport layer [{}], disconnecting from relevant node", e.getCause(), ctx.getChannel());
// close the channel as safe measure, which will cause a node to be disconnected if relevant

View File

@ -20,6 +20,7 @@
package org.elasticsearch.smoketest;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.SuppressForbidden;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
@ -34,7 +35,10 @@ import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URL;
import java.net.UnknownHostException;
import java.nio.file.Path;
import java.util.Locale;
@ -103,20 +107,14 @@ public abstract class ESSmokeClientTestCase extends LuceneTestCase {
return client;
}
private static Client startClient() throws UnknownHostException {
@SuppressForbidden(reason = "Must use port when creating address")
private static Client startClient() throws IOException {
String[] stringAddresses = clusterAddresses.split(",");
TransportAddress[] transportAddresses = new TransportAddress[stringAddresses.length];
int i = 0;
for (String stringAddress : stringAddresses) {
String[] split = stringAddress.split(":");
if (split.length < 2) {
throw new IllegalArgumentException("address [" + clusterAddresses + "] not valid");
}
try {
transportAddresses[i++] = new InetSocketTransportAddress(InetAddress.getByName(split[0]), Integer.valueOf(split[1]));
} catch (NumberFormatException e) {
throw new IllegalArgumentException("port is not valid, expected number but was [" + split[1] + "]");
}
URL url = new URL("http://" + stringAddress);
transportAddresses[i++] = new InetSocketTransportAddress(new InetSocketAddress(url.getHost(), url.getPort()));
}
return startClient(createTempDir(), transportAddresses);
}
@ -125,7 +123,7 @@ public abstract class ESSmokeClientTestCase extends LuceneTestCase {
if (client == null) {
try {
client = startClient();
} catch (UnknownHostException e) {
} catch (IOException e) {
logger.error("can not start the client", e);
}
assertThat(client, notNullValue());

View File

@ -28,6 +28,7 @@ import com.carrotsearch.randomizedtesting.generators.RandomPicks;
import org.apache.http.impl.client.HttpClients;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.SuppressForbidden;
import org.apache.lucene.util.TestUtil;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
@ -136,6 +137,8 @@ import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.UnknownHostException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
@ -1727,20 +1730,14 @@ public abstract class ESIntegTestCase extends ESTestCase {
return Settings.EMPTY;
}
private ExternalTestCluster buildExternalCluster(String clusterAddresses) throws UnknownHostException {
@SuppressForbidden(reason = "Must use port when creating address")
private ExternalTestCluster buildExternalCluster(String clusterAddresses) throws IOException {
String[] stringAddresses = clusterAddresses.split(",");
TransportAddress[] transportAddresses = new TransportAddress[stringAddresses.length];
int i = 0;
for (String stringAddress : stringAddresses) {
String[] split = stringAddress.split(":");
if (split.length < 2) {
throw new IllegalArgumentException("address [" + clusterAddresses + "] not valid");
}
try {
transportAddresses[i++] = new InetSocketTransportAddress(InetAddress.getByName(split[0]), Integer.valueOf(split[1]));
} catch (NumberFormatException e) {
throw new IllegalArgumentException("port is not valid, expected number but was [" + split[1] + "]");
}
URL url = new URL("http://" + stringAddress);
transportAddresses[i++] = new InetSocketTransportAddress(new InetSocketAddress(url.getHost(), url.getPort()));
}
return new ExternalTestCluster(createTempDir(), externalClusterClientSettings(), transportClientPlugins(), transportAddresses);
}