Build: Allow customizing wait condition and cluster settings
The current wait condition for an integ test cluster being up is a simple http get on the root path for elasticsearch. However, it is useful to allow having arbitrary wait conditions. This change reworks the wait task to first check that each node process started successfully and has a socket up, followed by an arbitrary wait condition which defaults to the current http get. Also, cluster settings are allowed to be added, and overriden. Finally, custom setup commands are made relative to the elasticsearch home dir for each node.
This commit is contained in:
parent
321606d3aa
commit
31b7e91e5a
|
@ -31,10 +31,10 @@ class ClusterConfiguration {
|
|||
int numNodes = 1
|
||||
|
||||
@Input
|
||||
int httpPort = 9400
|
||||
int baseHttpPort = 9400
|
||||
|
||||
@Input
|
||||
int transportPort = 9500
|
||||
int baseTransportPort = 9500
|
||||
|
||||
@Input
|
||||
boolean daemonize = true
|
||||
|
@ -45,22 +45,44 @@ class ClusterConfiguration {
|
|||
@Input
|
||||
String jvmArgs = System.getProperty('tests.jvm.argline', '')
|
||||
|
||||
/**
|
||||
* A closure to call before the cluster is considered ready. The closure is passed the node info,
|
||||
* as well as a groovy AntBuilder, to enable running ant condition checks. The default wait
|
||||
* condition is for http on the http port.
|
||||
*/
|
||||
@Input
|
||||
Closure waitCondition = { NodeInfo node, AntBuilder ant ->
|
||||
File tmpFile = new File(node.cwd, 'wait.success')
|
||||
ant.get(src: "http://localhost:${node.httpPort()}",
|
||||
dest: tmpFile.toString(),
|
||||
ignoreerrors: true, // do not fail on error, so logging buffers can be flushed by the wait task
|
||||
retries: 10)
|
||||
return tmpFile.exists()
|
||||
}
|
||||
|
||||
Map<String, String> systemProperties = new HashMap<>()
|
||||
|
||||
Map<String, String> settings = new HashMap<>()
|
||||
|
||||
LinkedHashMap<String, FileCollection> plugins = new LinkedHashMap<>()
|
||||
|
||||
LinkedHashMap<String, Object[]> setupCommands = new LinkedHashMap<>()
|
||||
|
||||
@Input
|
||||
void plugin(String name, FileCollection file) {
|
||||
plugins.put(name, file)
|
||||
}
|
||||
|
||||
@Input
|
||||
void systemProperty(String property, String value) {
|
||||
systemProperties.put(property, value)
|
||||
}
|
||||
|
||||
@Input
|
||||
void setting(String name, String value) {
|
||||
settings.put(name, value)
|
||||
}
|
||||
|
||||
@Input
|
||||
void plugin(String name, FileCollection file) {
|
||||
plugins.put(name, file)
|
||||
}
|
||||
|
||||
@Input
|
||||
void setupCommand(String name, Object... args) {
|
||||
setupCommands.put(name, args)
|
||||
|
|
|
@ -23,6 +23,7 @@ import org.apache.tools.ant.taskdefs.condition.Os
|
|||
import org.elasticsearch.gradle.VersionProperties
|
||||
import org.gradle.api.*
|
||||
import org.gradle.api.file.FileCollection
|
||||
import org.gradle.api.logging.Logger
|
||||
import org.gradle.api.tasks.Copy
|
||||
import org.gradle.api.tasks.Delete
|
||||
import org.gradle.api.tasks.Exec
|
||||
|
@ -34,87 +35,6 @@ import java.nio.file.Paths
|
|||
*/
|
||||
class ClusterFormationTasks {
|
||||
|
||||
static class NodeInfo {
|
||||
/** common configuration for all nodes, including this one */
|
||||
ClusterConfiguration config
|
||||
/** node number within the cluster, for creating unique names and paths */
|
||||
int nodeNum
|
||||
/** name of the cluster this node is part of */
|
||||
String clusterName
|
||||
/** root directory all node files and operations happen under */
|
||||
File baseDir
|
||||
/** the pid file the node will use */
|
||||
File pidFile
|
||||
/** elasticsearch home dir */
|
||||
File homeDir
|
||||
/** working directory for the node process */
|
||||
File cwd
|
||||
/** file that if it exists, indicates the node failed to start */
|
||||
File failedMarker
|
||||
/** stdout/stderr log of the elasticsearch process for this node */
|
||||
File startLog
|
||||
/** directory to install plugins from */
|
||||
File pluginsTmpDir
|
||||
/** environment variables to start the node with */
|
||||
Map<String, String> env
|
||||
/** arguments to start the node with */
|
||||
List<String> args
|
||||
/** Path to the elasticsearch start script */
|
||||
String esScript
|
||||
/** buffer for ant output when starting this node */
|
||||
ByteArrayOutputStream buffer = new ByteArrayOutputStream()
|
||||
|
||||
/** Creates a node to run as part of a cluster for the given task */
|
||||
NodeInfo(ClusterConfiguration config, int nodeNum, Project project, Task task) {
|
||||
this.config = config
|
||||
this.nodeNum = nodeNum
|
||||
clusterName = "${task.path.replace(':', '_').substring(1)}"
|
||||
baseDir = new File(project.buildDir, "cluster/${task.name} node${nodeNum}")
|
||||
pidFile = new File(baseDir, 'es.pid')
|
||||
homeDir = homeDir(baseDir, config.distribution)
|
||||
cwd = new File(baseDir, "cwd")
|
||||
failedMarker = new File(cwd, 'run.failed')
|
||||
startLog = new File(cwd, 'run.log')
|
||||
pluginsTmpDir = new File(baseDir, "plugins tmp")
|
||||
|
||||
env = [
|
||||
'JAVA_HOME' : project.javaHome,
|
||||
'ES_GC_OPTS': config.jvmArgs // we pass these with the undocumented gc opts so the argline can set gc, etc
|
||||
]
|
||||
args = config.systemProperties.collect { key, value -> "-D${key}=${value}" }
|
||||
for (Map.Entry<String, String> property : System.properties.entrySet()) {
|
||||
if (property.getKey().startsWith('es.')) {
|
||||
args.add("-D${property.getKey()}=${property.getValue()}")
|
||||
}
|
||||
}
|
||||
// running with cmd on windows will look for this with the .bat extension
|
||||
esScript = new File(homeDir, 'bin/elasticsearch').toString()
|
||||
}
|
||||
|
||||
/** Returns debug string for the command that started this node. */
|
||||
String getCommandString() {
|
||||
String esCommandString = "Elasticsearch node ${nodeNum} command: ${esScript} "
|
||||
esCommandString += args.join(' ')
|
||||
esCommandString += '\nenvironment:'
|
||||
env.each { k, v -> esCommandString += "\n ${k}: ${v}" }
|
||||
return esCommandString
|
||||
}
|
||||
|
||||
/** Returns the directory elasticsearch home is contained in for the given distribution */
|
||||
static File homeDir(File baseDir, String distro) {
|
||||
String path
|
||||
switch (distro) {
|
||||
case 'zip':
|
||||
case 'tar':
|
||||
path = "elasticsearch-${VersionProperties.elasticsearch}"
|
||||
break;
|
||||
default:
|
||||
throw new InvalidUserDataException("Unknown distribution: ${distro}")
|
||||
}
|
||||
return new File(baseDir, path)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds dependent tasks to the given task to start and stop a cluster with the given configuration.
|
||||
*/
|
||||
|
@ -194,7 +114,10 @@ class ClusterFormationTasks {
|
|||
|
||||
// extra setup commands
|
||||
for (Map.Entry<String, Object[]> command : node.config.setupCommands.entrySet()) {
|
||||
setup = configureExecTask(taskName(task, node, command.getKey()), project, setup, node, command.getValue())
|
||||
// the first argument is the actual script name, relative to home
|
||||
Object[] args = command.getValue().clone()
|
||||
args[0] = new File(node.homeDir, args[0].toString())
|
||||
setup = configureExecTask(taskName(task, node, command.getKey()), project, setup, node, args)
|
||||
}
|
||||
|
||||
Task start = configureStartTask(taskName(task, node, 'start'), project, setup, node)
|
||||
|
@ -236,10 +159,10 @@ class ClusterFormationTasks {
|
|||
static Task configureWriteConfigTask(String name, Project project, Task setup, NodeInfo node) {
|
||||
Map esConfig = [
|
||||
'cluster.name' : node.clusterName,
|
||||
'http.port' : node.config.httpPort + node.nodeNum,
|
||||
'transport.tcp.port' : node.config.transportPort + node.nodeNum,
|
||||
'http.port' : node.httpPort(),
|
||||
'transport.tcp.port' : node.transportPort(),
|
||||
'pidfile' : node.pidFile,
|
||||
'discovery.zen.ping.unicast.hosts': (0..<node.config.numNodes).collect{"127.0.0.1:${node.config.transportPort + it}"}.join(','),
|
||||
'discovery.zen.ping.unicast.hosts': (0..<node.config.numNodes).collect{"127.0.0.1:${node.config.baseTransportPort + it}"}.join(','),
|
||||
'path.repo' : "${node.homeDir}/repo",
|
||||
'path.shared_data' : "${node.homeDir}/../",
|
||||
// Define a node attribute so we can test that it exists
|
||||
|
@ -305,7 +228,7 @@ class ClusterFormationTasks {
|
|||
}
|
||||
|
||||
// this closure is converted into ant nodes by groovy's AntBuilder
|
||||
Closure antRunner = {
|
||||
Closure antRunner = { AntBuilder ant ->
|
||||
// we must add debug options inside the closure so the config is read at execution time, as
|
||||
// gradle task options are not processed until the end of the configuration phase
|
||||
if (node.config.debug) {
|
||||
|
@ -334,7 +257,7 @@ class ClusterFormationTasks {
|
|||
script = wrapperScript.toString()
|
||||
}
|
||||
|
||||
exec(executable: executable, spawn: node.config.daemonize, dir: node.cwd, taskname: 'elasticsearch') {
|
||||
ant.exec(executable: executable, spawn: node.config.daemonize, dir: node.cwd, taskname: 'elasticsearch') {
|
||||
node.env.each { key, value -> env(key: key, value: value) }
|
||||
arg(value: script)
|
||||
node.args.each { arg(value: it) }
|
||||
|
@ -347,7 +270,6 @@ class ClusterFormationTasks {
|
|||
node.getCommandString().eachLine { line -> logger.info(line) }
|
||||
|
||||
if (logger.isInfoEnabled() || node.config.daemonize == false) {
|
||||
// run with piping streams directly out (even stderr to stdout since gradle would capture it)
|
||||
runAntCommand(project, antRunner, System.out, System.err)
|
||||
} else {
|
||||
// buffer the output, we may not need to print it
|
||||
|
@ -364,7 +286,7 @@ class ClusterFormationTasks {
|
|||
static Task configureWaitTask(String name, Project project, List<NodeInfo> nodes, List<Task> startTasks) {
|
||||
Task wait = project.tasks.create(name: name, dependsOn: startTasks)
|
||||
wait.doLast {
|
||||
ant.waitfor(maxwait: '30', maxwaitunit: 'second', checkevery: '500', checkeveryunit: 'millisecond', timeoutproperty: "failed${name}") {
|
||||
ant.waitfor(maxwait: '15', maxwaitunit: 'second', checkevery: '500', checkeveryunit: 'millisecond', timeoutproperty: "failed${name}") {
|
||||
or {
|
||||
for (NodeInfo node : nodes) {
|
||||
resourceexists {
|
||||
|
@ -376,7 +298,7 @@ class ClusterFormationTasks {
|
|||
resourceexists {
|
||||
file(file: node.pidFile.toString())
|
||||
}
|
||||
http(url: "http://localhost:${node.config.httpPort + node.nodeNum}")
|
||||
socket(server: '127.0.0.1', port: node.httpPort())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -386,26 +308,48 @@ class ClusterFormationTasks {
|
|||
anyNodeFailed |= node.failedMarker.exists()
|
||||
}
|
||||
if (ant.properties.containsKey("failed${name}".toString()) || anyNodeFailed) {
|
||||
for (NodeInfo node : nodes) {
|
||||
if (logger.isInfoEnabled() == false) {
|
||||
// We already log the command at info level. No need to do it twice.
|
||||
node.getCommandString().eachLine { line -> logger.error(line) }
|
||||
}
|
||||
// the waitfor failed, so dump any output we got (may be empty if info logging, but that is ok)
|
||||
logger.error("Node ${node.nodeNum} ant output:")
|
||||
node.buffer.toString('UTF-8').eachLine { line -> logger.error(line) }
|
||||
// also dump the log file for the startup script (which will include ES logging output to stdout)
|
||||
if (node.startLog.exists()) {
|
||||
logger.error("Node ${node.nodeNum} log:")
|
||||
node.startLog.eachLine { line -> logger.error(line) }
|
||||
}
|
||||
waitFailed(nodes, logger, 'Failed to start elasticsearch')
|
||||
}
|
||||
|
||||
// go through each node checking the wait condition
|
||||
for (NodeInfo node : nodes) {
|
||||
// first bind node info to the closure, then pass to the ant runner so we can get good logging
|
||||
Closure antRunner = node.config.waitCondition.curry(node)
|
||||
|
||||
boolean success
|
||||
if (logger.isInfoEnabled()) {
|
||||
success = runAntCommand(project, antRunner, System.out, System.err)
|
||||
} else {
|
||||
PrintStream captureStream = new PrintStream(node.buffer, true, "UTF-8")
|
||||
success = runAntCommand(project, antRunner, captureStream, captureStream)
|
||||
}
|
||||
|
||||
if (success == false) {
|
||||
waitFailed(nodes, logger, 'Elasticsearch cluster failed to pass wait condition')
|
||||
}
|
||||
throw new GradleException('Failed to start elasticsearch')
|
||||
}
|
||||
}
|
||||
return wait
|
||||
}
|
||||
|
||||
static void waitFailed(List<NodeInfo> nodes, Logger logger, String msg) {
|
||||
for (NodeInfo node : nodes) {
|
||||
if (logger.isInfoEnabled() == false) {
|
||||
// We already log the command at info level. No need to do it twice.
|
||||
node.getCommandString().eachLine { line -> logger.error(line) }
|
||||
}
|
||||
// the waitfor failed, so dump any output we got (may be empty if info logging, but that is ok)
|
||||
logger.error("Node ${node.nodeNum} ant output:")
|
||||
node.buffer.toString('UTF-8').eachLine { line -> logger.error(line) }
|
||||
// also dump the log file for the startup script (which will include ES logging output to stdout)
|
||||
if (node.startLog.exists()) {
|
||||
logger.error("Node ${node.nodeNum} log:")
|
||||
node.startLog.eachLine { line -> logger.error(line) }
|
||||
}
|
||||
}
|
||||
throw new GradleException(msg)
|
||||
}
|
||||
|
||||
/** Adds a task to check if the process with the given pidfile is actually elasticsearch */
|
||||
static Task configureCheckPreviousTask(String name, Project project, Object depends, NodeInfo node) {
|
||||
return project.tasks.create(name: name, type: Exec, dependsOn: depends) {
|
||||
|
@ -475,14 +419,15 @@ class ClusterFormationTasks {
|
|||
}
|
||||
|
||||
/** Runs an ant command, sending output to the given out and error streams */
|
||||
static void runAntCommand(Project project, Closure command, PrintStream outputStream, PrintStream errorStream) {
|
||||
static Object runAntCommand(Project project, Closure command, PrintStream outputStream, PrintStream errorStream) {
|
||||
DefaultLogger listener = new DefaultLogger(
|
||||
errorPrintStream: errorStream,
|
||||
outputPrintStream: outputStream,
|
||||
messageOutputLevel: org.apache.tools.ant.Project.MSG_INFO)
|
||||
|
||||
project.ant.project.addBuildListener(listener)
|
||||
project.configure(project.ant, command)
|
||||
Object retVal = command(project.ant)
|
||||
project.ant.project.removeBuildListener(listener)
|
||||
return retVal
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,131 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.gradle.test
|
||||
|
||||
import org.elasticsearch.gradle.VersionProperties
|
||||
import org.gradle.api.InvalidUserDataException
|
||||
import org.gradle.api.Project
|
||||
import org.gradle.api.Task
|
||||
|
||||
/**
|
||||
* A container for the files and configuration associated with a single node in a test cluster.
|
||||
*/
|
||||
class NodeInfo {
|
||||
/** common configuration for all nodes, including this one */
|
||||
ClusterConfiguration config
|
||||
|
||||
/** node number within the cluster, for creating unique names and paths */
|
||||
int nodeNum
|
||||
|
||||
/** name of the cluster this node is part of */
|
||||
String clusterName
|
||||
|
||||
/** root directory all node files and operations happen under */
|
||||
File baseDir
|
||||
|
||||
/** the pid file the node will use */
|
||||
File pidFile
|
||||
|
||||
/** elasticsearch home dir */
|
||||
File homeDir
|
||||
|
||||
/** working directory for the node process */
|
||||
File cwd
|
||||
|
||||
/** file that if it exists, indicates the node failed to start */
|
||||
File failedMarker
|
||||
|
||||
/** stdout/stderr log of the elasticsearch process for this node */
|
||||
File startLog
|
||||
|
||||
/** directory to install plugins from */
|
||||
File pluginsTmpDir
|
||||
|
||||
/** environment variables to start the node with */
|
||||
Map<String, String> env
|
||||
|
||||
/** arguments to start the node with */
|
||||
List<String> args
|
||||
|
||||
/** Path to the elasticsearch start script */
|
||||
String esScript
|
||||
|
||||
/** buffer for ant output when starting this node */
|
||||
ByteArrayOutputStream buffer = new ByteArrayOutputStream()
|
||||
|
||||
/** Creates a node to run as part of a cluster for the given task */
|
||||
NodeInfo(ClusterConfiguration config, int nodeNum, Project project, Task task) {
|
||||
this.config = config
|
||||
this.nodeNum = nodeNum
|
||||
clusterName = "${task.path.replace(':', '_').substring(1)}"
|
||||
baseDir = new File(project.buildDir, "cluster/${task.name} node${nodeNum}")
|
||||
pidFile = new File(baseDir, 'es.pid')
|
||||
homeDir = homeDir(baseDir, config.distribution)
|
||||
cwd = new File(baseDir, "cwd")
|
||||
failedMarker = new File(cwd, 'run.failed')
|
||||
startLog = new File(cwd, 'run.log')
|
||||
pluginsTmpDir = new File(baseDir, "plugins tmp")
|
||||
|
||||
env = [
|
||||
'JAVA_HOME' : project.javaHome,
|
||||
'ES_GC_OPTS': config.jvmArgs // we pass these with the undocumented gc opts so the argline can set gc, etc
|
||||
]
|
||||
args = config.systemProperties.collect { key, value -> "-D${key}=${value}" }
|
||||
for (Map.Entry<String, String> property : System.properties.entrySet()) {
|
||||
if (property.getKey().startsWith('es.')) {
|
||||
args.add("-D${property.getKey()}=${property.getValue()}")
|
||||
}
|
||||
}
|
||||
// running with cmd on windows will look for this with the .bat extension
|
||||
esScript = new File(homeDir, 'bin/elasticsearch').toString()
|
||||
}
|
||||
|
||||
/** Returns debug string for the command that started this node. */
|
||||
String getCommandString() {
|
||||
String esCommandString = "Elasticsearch node ${nodeNum} command: ${esScript} "
|
||||
esCommandString += args.join(' ')
|
||||
esCommandString += '\nenvironment:'
|
||||
env.each { k, v -> esCommandString += "\n ${k}: ${v}" }
|
||||
return esCommandString
|
||||
}
|
||||
|
||||
/** Returns the http port for this node */
|
||||
int httpPort() {
|
||||
return config.baseHttpPort + nodeNum
|
||||
}
|
||||
|
||||
/** Returns the transport port for this node */
|
||||
int transportPort() {
|
||||
return config.baseTransportPort + nodeNum
|
||||
}
|
||||
|
||||
/** Returns the directory elasticsearch home is contained in for the given distribution */
|
||||
static File homeDir(File baseDir, String distro) {
|
||||
String path
|
||||
switch (distro) {
|
||||
case 'zip':
|
||||
case 'tar':
|
||||
path = "elasticsearch-${VersionProperties.elasticsearch}"
|
||||
break;
|
||||
default:
|
||||
throw new InvalidUserDataException("Unknown distribution: ${distro}")
|
||||
}
|
||||
return new File(baseDir, path)
|
||||
}
|
||||
}
|
|
@ -75,7 +75,7 @@ class RestIntegTestTask extends RandomizedTestingTask {
|
|||
ClusterFormationTasks.setup(project, this, clusterConfig)
|
||||
configure {
|
||||
parallelism '1'
|
||||
systemProperty 'tests.cluster', "localhost:${clusterConfig.transportPort}"
|
||||
systemProperty 'tests.cluster', "localhost:${clusterConfig.baseTransportPort}"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,7 +6,7 @@ import org.gradle.api.internal.tasks.options.Option
|
|||
|
||||
class RunTask extends DefaultTask {
|
||||
|
||||
ClusterConfiguration clusterConfig = new ClusterConfiguration(httpPort: 9200, transportPort: 9300, daemonize: false)
|
||||
ClusterConfiguration clusterConfig = new ClusterConfiguration(baseHttpPort: 9200, baseTransportPort: 9300, daemonize: false)
|
||||
|
||||
RunTask() {
|
||||
project.afterEvaluate {
|
||||
|
|
Loading…
Reference in New Issue