Merge pull request #15402 from rjernst/port_zero

Change rest integ tests to not have hardcoded ports
This commit is contained in:
Ryan Ernst 2015-12-11 18:20:55 -08:00
commit 089a9e6977
10 changed files with 126 additions and 49 deletions

View File

@ -78,7 +78,7 @@ class RandomizedTestingTask extends DefaultTask {
@Input
String argLine = null
Map<String, String> systemProperties = new HashMap<>()
Map<String, Object> systemProperties = new HashMap<>()
PatternFilterable patternSet = new PatternSet()
RandomizedTestingTask() {
@ -100,7 +100,7 @@ class RandomizedTestingTask extends DefaultTask {
jvmArgs.add(argument)
}
void systemProperty(String property, String value) {
void systemProperty(String property, Object value) {
systemProperties.put(property, value)
}
@ -245,8 +245,8 @@ class RandomizedTestingTask extends DefaultTask {
exclude(name: excludePattern)
}
}
for (Map.Entry<String, String> prop : systemProperties) {
sysproperty key: prop.getKey(), value: prop.getValue()
for (Map.Entry<String, Object> prop : systemProperties) {
sysproperty key: prop.getKey(), value: prop.getValue().toString()
}
makeListeners()
}

View File

@ -33,10 +33,10 @@ class ClusterConfiguration {
int numNodes = 1
@Input
int baseHttpPort = 9400
int httpPort = 0
@Input
int baseTransportPort = 9500
int transportPort = 0
@Input
boolean daemonize = true
@ -55,7 +55,7 @@ class ClusterConfiguration {
@Input
Closure waitCondition = { NodeInfo node, AntBuilder ant ->
File tmpFile = new File(node.cwd, 'wait.success')
ant.get(src: "http://localhost:${node.httpPort()}",
ant.get(src: "http://${node.httpUri()}",
dest: tmpFile.toString(),
ignoreerrors: true, // do not fail on error, so logging buffers can be flushed by the wait task
retries: 10)

View File

@ -38,8 +38,10 @@ class ClusterFormationTasks {
/**
* Adds dependent tasks to the given task to start and stop a cluster with the given configuration.
*
* Returns an object that will resolve at execution time of the given task to a uri for the cluster.
*/
static void setup(Project project, Task task, ClusterConfiguration config) {
static Object setup(Project project, Task task, ClusterConfiguration config) {
if (task.getEnabled() == false) {
// no need to add cluster formation tasks if the task won't run!
return
@ -55,6 +57,9 @@ class ClusterFormationTasks {
Task wait = configureWaitTask("${task.name}#wait", project, nodes, startTasks)
task.dependsOn(wait)
// delay the resolution of the uri by wrapping in a closure, so it is not used until read for tests
return "${-> nodes[0].transportUri()}"
}
/** Adds a dependency on the given distribution */
@ -200,17 +205,24 @@ class ClusterFormationTasks {
/** Adds a task to write elasticsearch.yml for the given node configuration */
static Task configureWriteConfigTask(String name, Project project, Task setup, NodeInfo node) {
Map esConfig = [
'cluster.name' : node.clusterName,
'http.port' : node.httpPort(),
'transport.tcp.port' : node.transportPort(),
'pidfile' : node.pidFile,
'discovery.zen.ping.unicast.hosts': (0..<node.config.numNodes).collect{"127.0.0.1:${node.config.baseTransportPort + it}"}.join(','),
'path.repo' : "${node.homeDir}/repo",
'path.shared_data' : "${node.homeDir}/../",
// Define a node attribute so we can test that it exists
'node.testattr' : 'test',
'repositories.url.allowed_urls' : 'http://snapshot.test*'
'cluster.name' : node.clusterName,
'pidfile' : node.pidFile,
'path.repo' : "${node.homeDir}/repo",
'path.shared_data' : "${node.homeDir}/../",
// Define a node attribute so we can test that it exists
'node.testattr' : 'test',
'repositories.url.allowed_urls': 'http://snapshot.test*'
]
if (node.config.numNodes == 1) {
esConfig['http.port'] = node.config.httpPort
esConfig['transport.tcp.port'] = node.config.transportPort
} else {
// TODO: fix multi node so it doesn't use hardcoded prots
esConfig['http.port'] = 9400 + node.nodeNum
esConfig['transport.tcp.port'] = 9500 + node.nodeNum
esConfig['discovery.zen.ping.unicast.hosts'] = (0..<node.config.numNodes).collect{"localhost:${9500 + it}"}.join(',')
}
esConfig.putAll(node.config.settings)
Task writeConfig = project.tasks.create(name: name, type: DefaultTask, dependsOn: setup)
@ -400,7 +412,12 @@ class ClusterFormationTasks {
resourceexists {
file(file: node.pidFile.toString())
}
socket(server: '127.0.0.1', port: node.httpPort())
resourceexists {
file(file: node.httpPortsFile.toString())
}
resourceexists {
file(file: node.transportPortsFile.toString())
}
}
}
}
@ -444,6 +461,8 @@ class ClusterFormationTasks {
logger.error("|-----------------------------------------")
logger.error("| failure marker exists: ${node.failedMarker.exists()}")
logger.error("| pid file exists: ${node.pidFile.exists()}")
logger.error("| http ports file exists: ${node.httpPortsFile.exists()}")
logger.error("| transport ports file exists: ${node.transportPortsFile.exists()}")
// the waitfor failed, so dump any output we got (if info logging this goes directly to stdout)
logger.error("|\n| [ant output]")
node.buffer.toString('UTF-8').eachLine { line -> logger.error("| ${line}") }

View File

@ -43,6 +43,12 @@ class NodeInfo {
/** the pid file the node will use */
File pidFile
/** a file written by elasticsearch containing the ports of each bound address for http */
File httpPortsFile
/** a file written by elasticsearch containing the ports of each bound address for transport */
File transportPortsFile
/** elasticsearch home dir */
File homeDir
@ -92,6 +98,10 @@ class NodeInfo {
homeDir = homeDir(baseDir, config.distribution)
confDir = confDir(baseDir, config.distribution)
configFile = new File(confDir, 'elasticsearch.yml')
// even for rpm/deb, the logs are under home because we dont start with real services
File logsDir = new File(homeDir, 'logs')
httpPortsFile = new File(logsDir, 'http.ports')
transportPortsFile = new File(logsDir, 'transport.ports')
cwd = new File(baseDir, "cwd")
failedMarker = new File(cwd, 'run.failed')
startLog = new File(cwd, 'run.log')
@ -119,6 +129,7 @@ class NodeInfo {
'JAVA_HOME' : project.javaHome,
'ES_GC_OPTS': config.jvmArgs // we pass these with the undocumented gc opts so the argline can set gc, etc
]
args.add("-Des.tests.portsfile=true")
args.addAll(config.systemProperties.collect { key, value -> "-D${key}=${value}" })
for (Map.Entry<String, String> property : System.properties.entrySet()) {
if (property.getKey().startsWith('es.')) {
@ -159,14 +170,14 @@ class NodeInfo {
wrapperScript.setText("\"${esScript}\" ${argsPasser} > run.log 2>&1 ${exitMarker}", 'UTF-8')
}
/** Returns the http port for this node */
int httpPort() {
return config.baseHttpPort + nodeNum
/** Returns an address and port suitable for a uri to connect to this node over http */
String httpUri() {
return httpPortsFile.readLines("UTF-8").get(0)
}
/** Returns the transport port for this node */
int transportPort() {
return config.baseTransportPort + nodeNum
/** Returns an address and port suitable for a uri to connect to this node over transport protocol */
String transportUri() {
return transportPortsFile.readLines("UTF-8").get(0)
}
/** Returns the directory elasticsearch home is contained in for the given distribution */

View File

@ -57,12 +57,12 @@ public class RestIntegTestTask extends RandomizedTestingTask {
RestSpecHack.configureDependencies(project)
project.afterEvaluate {
dependsOn(RestSpecHack.configureTask(project, includePackaged))
systemProperty('tests.cluster', "localhost:${clusterConfig.baseTransportPort}")
}
// this must run after all projects have been configured, so we know any project
// references can be accessed as a fully configured
project.gradle.projectsEvaluated {
ClusterFormationTasks.setup(project, this, clusterConfig)
Object clusterUri = ClusterFormationTasks.setup(project, this, clusterConfig)
systemProperty('tests.cluster', clusterUri)
}
}

View File

@ -8,7 +8,7 @@ import org.gradle.util.ConfigureUtil
public class RunTask extends DefaultTask {
ClusterConfiguration clusterConfig = new ClusterConfiguration(baseHttpPort: 9200, baseTransportPort: 9300, daemonize: false)
ClusterConfiguration clusterConfig = new ClusterConfiguration(httpPort: 9200, transportPort: 9300, daemonize: false)
public RunTask() {
description = "Runs elasticsearch with '${project.path}'"

View File

@ -33,6 +33,7 @@ import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.inject.Injector;
@ -42,11 +43,14 @@ import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.settings.SettingsModule;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.discovery.DiscoveryService;
@ -59,6 +63,7 @@ import org.elasticsearch.gateway.GatewayModule;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.http.HttpServer;
import org.elasticsearch.http.HttpServerModule;
import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.indices.IndicesModule;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.analysis.AnalysisModule;
@ -97,7 +102,16 @@ import org.elasticsearch.tribe.TribeService;
import org.elasticsearch.watcher.ResourceWatcherModule;
import org.elasticsearch.watcher.ResourceWatcherService;
import java.io.BufferedWriter;
import java.io.IOException;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import java.nio.file.CopyOption;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@ -274,6 +288,15 @@ public class Node implements Releasable {
injector.getInstance(ResourceWatcherService.class).start();
injector.getInstance(TribeService.class).start();
if (System.getProperty("es.tests.portsfile", "false").equals("true")) {
if (settings.getAsBoolean("http.enabled", true)) {
HttpServerTransport http = injector.getInstance(HttpServerTransport.class);
writePortsFile("http", http.boundAddress());
}
TransportService transport = injector.getInstance(TransportService.class);
writePortsFile("transport", transport.boundAddress());
}
logger.info("started");
return this;
@ -425,4 +448,27 @@ public class Node implements Releasable {
public Injector injector() {
return this.injector;
}
/** Writes a file to the logs dir containing the ports for the given transport type */
private void writePortsFile(String type, BoundTransportAddress boundAddress) {
Path tmpPortsFile = environment.logsFile().resolve(type + ".ports.tmp");
try (BufferedWriter writer = Files.newBufferedWriter(tmpPortsFile, Charset.forName("UTF-8"))) {
for (TransportAddress address : boundAddress.boundAddresses()) {
InetAddress inetAddress = InetAddress.getByName(address.getAddress());
if (inetAddress instanceof Inet6Address && inetAddress.isLinkLocalAddress()) {
// no link local, just causes problems
continue;
}
writer.write(NetworkAddress.formatAddress(new InetSocketAddress(inetAddress, address.getPort())) + "\n");
}
} catch (IOException e) {
throw new RuntimeException("Failed to write ports file", e);
}
Path portsFile = environment.logsFile().resolve(type + ".ports");
try {
Files.move(tmpPortsFile, portsFile, StandardCopyOption.ATOMIC_MOVE);
} catch (IOException e) {
throw new RuntimeException("Failed to rename ports file", e);
}
}
}

View File

@ -87,6 +87,7 @@ import org.jboss.netty.channel.socket.oio.OioServerSocketChannelFactory;
import org.jboss.netty.util.HashedWheelTimer;
import java.io.IOException;
import java.net.BindException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
@ -763,6 +764,11 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
// close the channel as safe measure, which will cause a node to be disconnected if relevant
ctx.getChannel().close();
disconnectFromNodeChannel(ctx.getChannel(), e.getCause());
} else if (e.getCause() instanceof BindException) {
logger.trace("bind exception caught on transport layer [{}]", e.getCause(), ctx.getChannel());
// close the channel as safe measure, which will cause a node to be disconnected if relevant
ctx.getChannel().close();
disconnectFromNodeChannel(ctx.getChannel(), e.getCause());
} else if (e.getCause() instanceof CancelledKeyException) {
logger.trace("cancelled key exception caught on transport layer [{}], disconnecting from relevant node", e.getCause(), ctx.getChannel());
// close the channel as safe measure, which will cause a node to be disconnected if relevant

View File

@ -20,6 +20,7 @@
package org.elasticsearch.smoketest;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.SuppressForbidden;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
@ -34,7 +35,10 @@ import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URL;
import java.net.UnknownHostException;
import java.nio.file.Path;
import java.util.Locale;
@ -103,20 +107,14 @@ public abstract class ESSmokeClientTestCase extends LuceneTestCase {
return client;
}
private static Client startClient() throws UnknownHostException {
private static Client startClient() throws IOException {
String[] stringAddresses = clusterAddresses.split(",");
TransportAddress[] transportAddresses = new TransportAddress[stringAddresses.length];
int i = 0;
for (String stringAddress : stringAddresses) {
String[] split = stringAddress.split(":");
if (split.length < 2) {
throw new IllegalArgumentException("address [" + clusterAddresses + "] not valid");
}
try {
transportAddresses[i++] = new InetSocketTransportAddress(InetAddress.getByName(split[0]), Integer.valueOf(split[1]));
} catch (NumberFormatException e) {
throw new IllegalArgumentException("port is not valid, expected number but was [" + split[1] + "]");
}
URL url = new URL("http://" + stringAddress);
InetAddress inetAddress = InetAddress.getByName(url.getHost());
transportAddresses[i++] = new InetSocketTransportAddress(new InetSocketAddress(inetAddress, url.getPort()));
}
return startClient(createTempDir(), transportAddresses);
}
@ -125,7 +123,7 @@ public abstract class ESSmokeClientTestCase extends LuceneTestCase {
if (client == null) {
try {
client = startClient();
} catch (UnknownHostException e) {
} catch (IOException e) {
logger.error("can not start the client", e);
}
assertThat(client, notNullValue());

View File

@ -28,6 +28,7 @@ import com.carrotsearch.randomizedtesting.generators.RandomPicks;
import org.apache.http.impl.client.HttpClients;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.SuppressForbidden;
import org.apache.lucene.util.TestUtil;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
@ -136,6 +137,8 @@ import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.UnknownHostException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
@ -1727,20 +1730,14 @@ public abstract class ESIntegTestCase extends ESTestCase {
return Settings.EMPTY;
}
private ExternalTestCluster buildExternalCluster(String clusterAddresses) throws UnknownHostException {
private ExternalTestCluster buildExternalCluster(String clusterAddresses) throws IOException {
String[] stringAddresses = clusterAddresses.split(",");
TransportAddress[] transportAddresses = new TransportAddress[stringAddresses.length];
int i = 0;
for (String stringAddress : stringAddresses) {
String[] split = stringAddress.split(":");
if (split.length < 2) {
throw new IllegalArgumentException("address [" + clusterAddresses + "] not valid");
}
try {
transportAddresses[i++] = new InetSocketTransportAddress(InetAddress.getByName(split[0]), Integer.valueOf(split[1]));
} catch (NumberFormatException e) {
throw new IllegalArgumentException("port is not valid, expected number but was [" + split[1] + "]");
}
URL url = new URL("http://" + stringAddress);
InetAddress inetAddress = InetAddress.getByName(url.getHost());
transportAddresses[i++] = new InetSocketTransportAddress(new InetSocketAddress(inetAddress, url.getPort()));
}
return new ExternalTestCluster(createTempDir(), externalClusterClientSettings(), transportClientPlugins(), transportAddresses);
}