Merge pull request #14959 from rjernst/cluster_waiting

Allow customizing wait condition and cluster settings
This commit is contained in:
Ryan Ernst 2015-11-24 08:24:00 -08:00
commit f0e780bf86
5 changed files with 212 additions and 114 deletions

View File

@ -31,10 +31,10 @@ class ClusterConfiguration {
int numNodes = 1
@Input
int httpPort = 9400
int baseHttpPort = 9400
@Input
int transportPort = 9500
int baseTransportPort = 9500
@Input
boolean daemonize = true
@ -45,22 +45,44 @@ class ClusterConfiguration {
@Input
String jvmArgs = System.getProperty('tests.jvm.argline', '')
/**
* A closure to call before the cluster is considered ready. The closure is passed the node info,
* as well as a groovy AntBuilder, to enable running ant condition checks. The default wait
* condition is for http on the http port.
*/
@Input
Closure waitCondition = { NodeInfo node, AntBuilder ant ->
File tmpFile = new File(node.cwd, 'wait.success')
ant.get(src: "http://localhost:${node.httpPort()}",
dest: tmpFile.toString(),
ignoreerrors: true, // do not fail on error, so logging buffers can be flushed by the wait task
retries: 10)
return tmpFile.exists()
}
Map<String, String> systemProperties = new HashMap<>()
Map<String, String> settings = new HashMap<>()
LinkedHashMap<String, FileCollection> plugins = new LinkedHashMap<>()
LinkedHashMap<String, Object[]> setupCommands = new LinkedHashMap<>()
@Input
void plugin(String name, FileCollection file) {
plugins.put(name, file)
}
@Input
void systemProperty(String property, String value) {
systemProperties.put(property, value)
}
@Input
void setting(String name, String value) {
settings.put(name, value)
}
@Input
void plugin(String name, FileCollection file) {
plugins.put(name, file)
}
@Input
void setupCommand(String name, Object... args) {
setupCommands.put(name, args)

View File

@ -23,6 +23,7 @@ import org.apache.tools.ant.taskdefs.condition.Os
import org.elasticsearch.gradle.VersionProperties
import org.gradle.api.*
import org.gradle.api.file.FileCollection
import org.gradle.api.logging.Logger
import org.gradle.api.tasks.Copy
import org.gradle.api.tasks.Delete
import org.gradle.api.tasks.Exec
@ -34,87 +35,6 @@ import java.nio.file.Paths
*/
class ClusterFormationTasks {
static class NodeInfo {
/** common configuration for all nodes, including this one */
ClusterConfiguration config
/** node number within the cluster, for creating unique names and paths */
int nodeNum
/** name of the cluster this node is part of */
String clusterName
/** root directory all node files and operations happen under */
File baseDir
/** the pid file the node will use */
File pidFile
/** elasticsearch home dir */
File homeDir
/** working directory for the node process */
File cwd
/** file that if it exists, indicates the node failed to start */
File failedMarker
/** stdout/stderr log of the elasticsearch process for this node */
File startLog
/** directory to install plugins from */
File pluginsTmpDir
/** environment variables to start the node with */
Map<String, String> env
/** arguments to start the node with */
List<String> args
/** Path to the elasticsearch start script */
String esScript
/** buffer for ant output when starting this node */
ByteArrayOutputStream buffer = new ByteArrayOutputStream()
/** Creates a node to run as part of a cluster for the given task */
NodeInfo(ClusterConfiguration config, int nodeNum, Project project, Task task) {
this.config = config
this.nodeNum = nodeNum
clusterName = "${task.path.replace(':', '_').substring(1)}"
baseDir = new File(project.buildDir, "cluster/${task.name} node${nodeNum}")
pidFile = new File(baseDir, 'es.pid')
homeDir = homeDir(baseDir, config.distribution)
cwd = new File(baseDir, "cwd")
failedMarker = new File(cwd, 'run.failed')
startLog = new File(cwd, 'run.log')
pluginsTmpDir = new File(baseDir, "plugins tmp")
env = [
'JAVA_HOME' : project.javaHome,
'ES_GC_OPTS': config.jvmArgs // we pass these with the undocumented gc opts so the argline can set gc, etc
]
args = config.systemProperties.collect { key, value -> "-D${key}=${value}" }
for (Map.Entry<String, String> property : System.properties.entrySet()) {
if (property.getKey().startsWith('es.')) {
args.add("-D${property.getKey()}=${property.getValue()}")
}
}
// running with cmd on windows will look for this with the .bat extension
esScript = new File(homeDir, 'bin/elasticsearch').toString()
}
/** Returns debug string for the command that started this node. */
String getCommandString() {
String esCommandString = "Elasticsearch node ${nodeNum} command: ${esScript} "
esCommandString += args.join(' ')
esCommandString += '\nenvironment:'
env.each { k, v -> esCommandString += "\n ${k}: ${v}" }
return esCommandString
}
/** Returns the directory elasticsearch home is contained in for the given distribution */
static File homeDir(File baseDir, String distro) {
String path
switch (distro) {
case 'zip':
case 'tar':
path = "elasticsearch-${VersionProperties.elasticsearch}"
break;
default:
throw new InvalidUserDataException("Unknown distribution: ${distro}")
}
return new File(baseDir, path)
}
}
/**
* Adds dependent tasks to the given task to start and stop a cluster with the given configuration.
*/
@ -194,7 +114,10 @@ class ClusterFormationTasks {
// extra setup commands
for (Map.Entry<String, Object[]> command : node.config.setupCommands.entrySet()) {
setup = configureExecTask(taskName(task, node, command.getKey()), project, setup, node, command.getValue())
// the first argument is the actual script name, relative to home
Object[] args = command.getValue().clone()
args[0] = new File(node.homeDir, args[0].toString())
setup = configureExecTask(taskName(task, node, command.getKey()), project, setup, node, args)
}
Task start = configureStartTask(taskName(task, node, 'start'), project, setup, node)
@ -236,10 +159,10 @@ class ClusterFormationTasks {
static Task configureWriteConfigTask(String name, Project project, Task setup, NodeInfo node) {
Map esConfig = [
'cluster.name' : node.clusterName,
'http.port' : node.config.httpPort + node.nodeNum,
'transport.tcp.port' : node.config.transportPort + node.nodeNum,
'http.port' : node.httpPort(),
'transport.tcp.port' : node.transportPort(),
'pidfile' : node.pidFile,
'discovery.zen.ping.unicast.hosts': (0..<node.config.numNodes).collect{"127.0.0.1:${node.config.transportPort + it}"}.join(','),
'discovery.zen.ping.unicast.hosts': (0..<node.config.numNodes).collect{"127.0.0.1:${node.config.baseTransportPort + it}"}.join(','),
'path.repo' : "${node.homeDir}/repo",
'path.shared_data' : "${node.homeDir}/../",
// Define a node attribute so we can test that it exists
@ -305,7 +228,7 @@ class ClusterFormationTasks {
}
// this closure is converted into ant nodes by groovy's AntBuilder
Closure antRunner = {
Closure antRunner = { AntBuilder ant ->
// we must add debug options inside the closure so the config is read at execution time, as
// gradle task options are not processed until the end of the configuration phase
if (node.config.debug) {
@ -334,7 +257,7 @@ class ClusterFormationTasks {
script = wrapperScript.toString()
}
exec(executable: executable, spawn: node.config.daemonize, dir: node.cwd, taskname: 'elasticsearch') {
ant.exec(executable: executable, spawn: node.config.daemonize, dir: node.cwd, taskname: 'elasticsearch') {
node.env.each { key, value -> env(key: key, value: value) }
arg(value: script)
node.args.each { arg(value: it) }
@ -347,7 +270,6 @@ class ClusterFormationTasks {
node.getCommandString().eachLine { line -> logger.info(line) }
if (logger.isInfoEnabled() || node.config.daemonize == false) {
// run with piping streams directly out (even stderr to stdout since gradle would capture it)
runAntCommand(project, antRunner, System.out, System.err)
} else {
// buffer the output, we may not need to print it
@ -376,7 +298,7 @@ class ClusterFormationTasks {
resourceexists {
file(file: node.pidFile.toString())
}
http(url: "http://localhost:${node.config.httpPort + node.nodeNum}")
socket(server: '127.0.0.1', port: node.httpPort())
}
}
}
@ -386,26 +308,48 @@ class ClusterFormationTasks {
anyNodeFailed |= node.failedMarker.exists()
}
if (ant.properties.containsKey("failed${name}".toString()) || anyNodeFailed) {
for (NodeInfo node : nodes) {
if (logger.isInfoEnabled() == false) {
// We already log the command at info level. No need to do it twice.
node.getCommandString().eachLine { line -> logger.error(line) }
}
// the waitfor failed, so dump any output we got (may be empty if info logging, but that is ok)
logger.error("Node ${node.nodeNum} ant output:")
node.buffer.toString('UTF-8').eachLine { line -> logger.error(line) }
// also dump the log file for the startup script (which will include ES logging output to stdout)
if (node.startLog.exists()) {
logger.error("Node ${node.nodeNum} log:")
node.startLog.eachLine { line -> logger.error(line) }
}
waitFailed(nodes, logger, 'Failed to start elasticsearch')
}
// go through each node checking the wait condition
for (NodeInfo node : nodes) {
// first bind node info to the closure, then pass to the ant runner so we can get good logging
Closure antRunner = node.config.waitCondition.curry(node)
boolean success
if (logger.isInfoEnabled()) {
success = runAntCommand(project, antRunner, System.out, System.err)
} else {
PrintStream captureStream = new PrintStream(node.buffer, true, "UTF-8")
success = runAntCommand(project, antRunner, captureStream, captureStream)
}
if (success == false) {
waitFailed(nodes, logger, 'Elasticsearch cluster failed to pass wait condition')
}
throw new GradleException('Failed to start elasticsearch')
}
}
return wait
}
static void waitFailed(List<NodeInfo> nodes, Logger logger, String msg) {
for (NodeInfo node : nodes) {
if (logger.isInfoEnabled() == false) {
// We already log the command at info level. No need to do it twice.
node.getCommandString().eachLine { line -> logger.error(line) }
}
// the waitfor failed, so dump any output we got (may be empty if info logging, but that is ok)
logger.error("Node ${node.nodeNum} ant output:")
node.buffer.toString('UTF-8').eachLine { line -> logger.error(line) }
// also dump the log file for the startup script (which will include ES logging output to stdout)
if (node.startLog.exists()) {
logger.error("Node ${node.nodeNum} log:")
node.startLog.eachLine { line -> logger.error(line) }
}
}
throw new GradleException(msg)
}
/** Adds a task to check if the process with the given pidfile is actually elasticsearch */
static Task configureCheckPreviousTask(String name, Project project, Object depends, NodeInfo node) {
return project.tasks.create(name: name, type: Exec, dependsOn: depends) {
@ -475,14 +419,15 @@ class ClusterFormationTasks {
}
/** Runs an ant command, sending output to the given out and error streams */
static void runAntCommand(Project project, Closure command, PrintStream outputStream, PrintStream errorStream) {
static Object runAntCommand(Project project, Closure command, PrintStream outputStream, PrintStream errorStream) {
DefaultLogger listener = new DefaultLogger(
errorPrintStream: errorStream,
outputPrintStream: outputStream,
messageOutputLevel: org.apache.tools.ant.Project.MSG_INFO)
project.ant.project.addBuildListener(listener)
project.configure(project.ant, command)
Object retVal = command(project.ant)
project.ant.project.removeBuildListener(listener)
return retVal
}
}

View File

@ -0,0 +1,131 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.gradle.test
import org.elasticsearch.gradle.VersionProperties
import org.gradle.api.InvalidUserDataException
import org.gradle.api.Project
import org.gradle.api.Task
/**
* A container for the files and configuration associated with a single node in a test cluster.
*/
class NodeInfo {
/** common configuration for all nodes, including this one */
ClusterConfiguration config
/** node number within the cluster, for creating unique names and paths */
int nodeNum
/** name of the cluster this node is part of */
String clusterName
/** root directory all node files and operations happen under */
File baseDir
/** the pid file the node will use */
File pidFile
/** elasticsearch home dir */
File homeDir
/** working directory for the node process */
File cwd
/** file that if it exists, indicates the node failed to start */
File failedMarker
/** stdout/stderr log of the elasticsearch process for this node */
File startLog
/** directory to install plugins from */
File pluginsTmpDir
/** environment variables to start the node with */
Map<String, String> env
/** arguments to start the node with */
List<String> args
/** Path to the elasticsearch start script */
String esScript
/** buffer for ant output when starting this node */
ByteArrayOutputStream buffer = new ByteArrayOutputStream()
/** Creates a node to run as part of a cluster for the given task */
NodeInfo(ClusterConfiguration config, int nodeNum, Project project, Task task) {
this.config = config
this.nodeNum = nodeNum
clusterName = "${task.path.replace(':', '_').substring(1)}"
baseDir = new File(project.buildDir, "cluster/${task.name} node${nodeNum}")
pidFile = new File(baseDir, 'es.pid')
homeDir = homeDir(baseDir, config.distribution)
cwd = new File(baseDir, "cwd")
failedMarker = new File(cwd, 'run.failed')
startLog = new File(cwd, 'run.log')
pluginsTmpDir = new File(baseDir, "plugins tmp")
env = [
'JAVA_HOME' : project.javaHome,
'ES_GC_OPTS': config.jvmArgs // we pass these with the undocumented gc opts so the argline can set gc, etc
]
args = config.systemProperties.collect { key, value -> "-D${key}=${value}" }
for (Map.Entry<String, String> property : System.properties.entrySet()) {
if (property.getKey().startsWith('es.')) {
args.add("-D${property.getKey()}=${property.getValue()}")
}
}
// running with cmd on windows will look for this with the .bat extension
esScript = new File(homeDir, 'bin/elasticsearch').toString()
}
/** Returns debug string for the command that started this node. */
String getCommandString() {
String esCommandString = "Elasticsearch node ${nodeNum} command: ${esScript} "
esCommandString += args.join(' ')
esCommandString += '\nenvironment:'
env.each { k, v -> esCommandString += "\n ${k}: ${v}" }
return esCommandString
}
/** Returns the http port for this node */
int httpPort() {
return config.baseHttpPort + nodeNum
}
/** Returns the transport port for this node */
int transportPort() {
return config.baseTransportPort + nodeNum
}
/** Returns the directory elasticsearch home is contained in for the given distribution */
static File homeDir(File baseDir, String distro) {
String path
switch (distro) {
case 'zip':
case 'tar':
path = "elasticsearch-${VersionProperties.elasticsearch}"
break;
default:
throw new InvalidUserDataException("Unknown distribution: ${distro}")
}
return new File(baseDir, path)
}
}

View File

@ -75,7 +75,7 @@ class RestIntegTestTask extends RandomizedTestingTask {
ClusterFormationTasks.setup(project, this, clusterConfig)
configure {
parallelism '1'
systemProperty 'tests.cluster', "localhost:${clusterConfig.transportPort}"
systemProperty 'tests.cluster', "localhost:${clusterConfig.baseTransportPort}"
}
}
}

View File

@ -6,7 +6,7 @@ import org.gradle.api.internal.tasks.options.Option
class RunTask extends DefaultTask {
ClusterConfiguration clusterConfig = new ClusterConfiguration(httpPort: 9200, transportPort: 9300, daemonize: false)
ClusterConfiguration clusterConfig = new ClusterConfiguration(baseHttpPort: 9200, baseTransportPort: 9300, daemonize: false)
RunTask() {
project.afterEvaluate {