Build: Get multi node smoke tests working
This change adds back the multi node smoke test, as well as making the cluster formation for any test allow multiple nodes. The main changes in cluster formation are abstracting out the node specific configuration to a helper struct, as well as making a single wait task that waits for all nodes after their start tasks have run. The output on failure was also improved to log which node's info is being printed.
This commit is contained in:
@ -34,6 +34,87 @@ import java.nio.file.Paths
class ClusterFormationTasks {
static class NodeInfo {
/** common configuration for all nodes, including this one */
ClusterConfiguration config
/** node number within the cluster, for creating unique names and paths */
int nodeNum
/** name of the cluster this node is part of */
String clusterName
/** root directory all node files and operations happen under */
File baseDir
/** the pid file the node will use */
File pidFile
/** elasticsearch home dir */
File homeDir
/** working directory for the node process */
File cwd
/** file that if it exists, indicates the node failed to start */
File failedMarker
/** stdout/stderr log of the elasticsearch process for this node */
File startLog
/** directory to install plugins from */
File pluginsTmpDir
/** environment variables to start the node with */
Map<String, String> env
/** arguments to start the node with */
List<String> args
/** Path to the elasticsearch start script */
String esScript
/** buffer for ant output when starting this node */
ByteArrayOutputStream buffer = new ByteArrayOutputStream()
/** Creates a node to run as part of a cluster for the given task */
NodeInfo(ClusterConfiguration config, int nodeNum, Project project, Task task) {
this.config = config
this.nodeNum = nodeNum
clusterName = "${task.path.replace(':', '_').substring(1)}"
baseDir = new File(project.buildDir, "cluster/${task.name} node${nodeNum}")
pidFile = new File(baseDir, 'es.pid')
homeDir = homeDir(baseDir, config.distribution)
cwd = new File(baseDir, "cwd")
failedMarker = new File(cwd, 'run.failed')
startLog = new File(cwd, 'run.log')
pluginsTmpDir = new File(baseDir, "plugins tmp")
env = [
'JAVA_HOME' : project.javaHome,
'ES_GC_OPTS': config.jvmArgs // we pass these with the undocumented gc opts so the argline can set gc, etc
args = config.systemProperties.collect { key, value -> "-D${key}=${value}" }
for (Map.Entry<String, String> property : System.properties.entrySet()) {
if (property.getKey().startsWith('es.')) {
// running with cmd on windows will look for this with the .bat extension
esScript = new File(homeDir, 'bin/elasticsearch').toString()
/** Returns debug string for the command that started this node. */
String getCommandString() {
String esCommandString = "Elasticsearch node ${nodeNum} command: ${esScript} "
esCommandString += args.join(' ')
esCommandString += '\nenvironment:'
env.each { k, v -> esCommandString += "\n ${k}: ${v}" }
return esCommandString
/** Returns the directory elasticsearch home is contained in for the given distribution */
static File homeDir(File baseDir, String distro) {
String path
switch (distro) {
case 'zip':
case 'tar':
path = "elasticsearch-${VersionProperties.elasticsearch}"
throw new InvalidUserDataException("Unknown distribution: ${distro}")
return new File(baseDir, path)
* Adds dependent tasks to the given task to start and stop a cluster with the given configuration.
@ -43,10 +124,16 @@ class ClusterFormationTasks {
configureDistributionDependency(project, config.distribution)
List<Task> startTasks = []
List<NodeInfo> nodes = []
for (int i = 0; i < config.numNodes; ++i) {
File nodeDir = new File(project.buildDir, "cluster/${task.name} node${i}")
configureTasks(project, task, config, nodeDir)
NodeInfo node = new NodeInfo(config, i, project, task)
startTasks.add(configureNode(project, task, node))
Task wait = configureWaitTask("${task.name}#wait", project, nodes, startTasks)
/** Adds a dependency on the given distribution */
@ -75,63 +162,60 @@ class ClusterFormationTasks {
* <li>Run additional setup commands</li>
* <li>Start elasticsearch<li>
* </ol>
* @return a task which starts the node.
static void configureTasks(Project project, Task task, ClusterConfiguration config, File baseDir) {
String clusterName = "${task.path.replace(':', '_').substring(1)}"
File pidFile = pidFile(baseDir)
File home = homeDir(baseDir, config.distribution)
File cwd = new File(baseDir, "cwd")
File pluginsTmpDir = new File(baseDir, "plugins tmp")
static Task configureNode(Project project, Task task, NodeInfo node) {
// tasks are chained so their execution order is maintained
Task setup = project.tasks.create(name: "${task.name}#clean", type: Delete, dependsOn: task.dependsOn.collect()) {
delete home
delete cwd
Task setup = project.tasks.create(name: taskName(task, node, 'clean'), type: Delete, dependsOn: task.dependsOn.collect()) {
delete node.homeDir
delete node.cwd
doLast {
setup = configureCheckPreviousTask("${task.name}#checkPrevious", project, setup, pidFile)
setup = configureStopTask("${task.name}#stopPrevious", project, setup, pidFile)
setup = configureExtractTask("${task.name}#extract", project, setup, baseDir, config.distribution)
setup = configureWriteConfigTask("${task.name}#configure", project, setup, home, config, clusterName, pidFile)
setup = configureCopyPluginsTask("${task.name}#copyPlugins", project, setup, pluginsTmpDir, config)
setup = configureCheckPreviousTask(taskName(task, node, 'checkPrevious'), project, setup, node)
setup = configureStopTask(taskName(task, node, 'stopPrevious'), project, setup, node)
setup = configureExtractTask(taskName(task, node, 'extract'), project, setup, node)
setup = configureWriteConfigTask(taskName(task, node, 'configure'), project, setup, node)
setup = configureCopyPluginsTask(taskName(task, node, 'copyPlugins'), project, setup, node)
// install plugins
for (Map.Entry<String, FileCollection> plugin : config.plugins.entrySet()) {
for (Map.Entry<String, FileCollection> plugin : node.config.plugins.entrySet()) {
// replace every dash followed by a character with just the uppercase character
String camelName = plugin.getKey().replaceAll(/-(\w)/) { _, c -> c.toUpperCase(Locale.ROOT) }
String taskName = "${task.name}#install${camelName[0].toUpperCase(Locale.ROOT) + camelName.substring(1)}Plugin"
String actionName = "install${camelName[0].toUpperCase(Locale.ROOT) + camelName.substring(1)}Plugin"
// delay reading the file location until execution time by wrapping in a closure within a GString
String file = "${-> new File(pluginsTmpDir, plugin.getValue().singleFile.getName()).toURI().toURL().toString()}"
Object[] args = [new File(home, 'bin/plugin'), 'install', file]
setup = configureExecTask(taskName, project, setup, cwd, args)
String file = "${-> new File(node.pluginsTmpDir, plugin.getValue().singleFile.getName()).toURI().toURL().toString()}"
Object[] args = [new File(node.homeDir, 'bin/plugin'), 'install', file]
setup = configureExecTask(taskName(task, node, actionName), project, setup, node, args)
// extra setup commands
for (Map.Entry<String, Object[]> command : config.setupCommands.entrySet()) {
setup = configureExecTask("${task.name}#${command.getKey()}", project, setup, cwd, command.getValue())
for (Map.Entry<String, Object[]> command : node.config.setupCommands.entrySet()) {
setup = configureExecTask(taskName(task, node, command.getKey()), project, setup, node, command.getValue())
Task start = configureStartTask("${task.name}#start", project, setup, cwd, config, clusterName, pidFile, home)
Task start = configureStartTask(taskName(task, node, 'start'), project, setup, node)
if (config.daemonize) {
if (node.config.daemonize) {
// if we are running in the background, make sure to stop the server when the task completes
Task stop = configureStopTask("${task.name}#stop", project, [], pidFile)
Task stop = configureStopTask(taskName(task, node, 'stop'), project, [], node)
return start
/** Adds a task to extract the elasticsearch distribution */
static Task configureExtractTask(String name, Project project, Task setup, File baseDir, String distro) {
static Task configureExtractTask(String name, Project project, Task setup, NodeInfo node) {
List extractDependsOn = [project.configurations.elasticsearchDistro, setup]
Task extract
switch (distro) {
switch (node.config.distribution) {
case 'zip':
extract = project.tasks.create(name: name, type: Copy, dependsOn: extractDependsOn) {
from { project.zipTree(project.configurations.elasticsearchDistro.singleFile) }
into baseDir
into node.baseDir
case 'tar':
@ -139,54 +223,53 @@ class ClusterFormationTasks {
from {
into baseDir
into node.baseDir
throw new InvalidUserDataException("Unknown distribution: ${distro}")
throw new InvalidUserDataException("Unknown distribution: ${node.config.distribution}")
return extract
/** Adds a task to write elasticsearch.yml for the given node configuration */
static Task configureWriteConfigTask(String name, Project project, Task setup, File home, ClusterConfiguration config, String clusterName, File pidFile) {
static Task configureWriteConfigTask(String name, Project project, Task setup, NodeInfo node) {
Map esConfig = [
'cluster.name' : clusterName,
'http.port' : config.httpPort,
'transport.tcp.port' : config.transportPort,
'pidfile' : pidFile,
// TODO: make this work for multi node!
'discovery.zen.ping.unicast.hosts': "localhost:${config.transportPort}",
'path.repo' : "${home}/repo",
'path.shared_data' : "${home}/../",
'cluster.name' : node.clusterName,
'http.port' : node.config.httpPort + node.nodeNum,
'transport.tcp.port' : node.config.transportPort + node.nodeNum,
'pidfile' : node.pidFile,
'discovery.zen.ping.unicast.hosts': (0..<node.config.numNodes).collect{"${node.config.transportPort + it}"}.join(','),
'path.repo' : "${node.homeDir}/repo",
'path.shared_data' : "${node.homeDir}/../",
// Define a node attribute so we can test that it exists
'node.testattr' : 'test',
'repositories.url.allowed_urls' : 'http://snapshot.test*'
return project.tasks.create(name: name, type: DefaultTask, dependsOn: setup) << {
File configFile = new File(home, 'config/elasticsearch.yml')
File configFile = new File(node.homeDir, 'config/elasticsearch.yml')
logger.info("Configuring ${configFile}")
configFile.setText(esConfig.collect { key, value -> "${key}: ${value}" }.join('\n'), 'UTF-8')
/** Adds a task to copy plugins to a temp dir, which they will later be installed from. */
static Task configureCopyPluginsTask(String name, Project project, Task setup, File pluginsTmpDir, ClusterConfiguration config) {
if (config.plugins.isEmpty()) {
static Task configureCopyPluginsTask(String name, Project project, Task setup, NodeInfo node) {
if (node.config.plugins.isEmpty()) {
return setup
return project.tasks.create(name: name, type: Copy, dependsOn: setup) {
into pluginsTmpDir
into node.pluginsTmpDir
/** Adds a task to execute a command to help setup the cluster */
static Task configureExecTask(String name, Project project, Task setup, File cwd, Object[] execArgs) {
static Task configureExecTask(String name, Project project, Task setup, NodeInfo node, Object[] execArgs) {
return project.tasks.create(name: name, type: Exec, dependsOn: setup) {
workingDir cwd
workingDir node.cwd
if (Os.isFamily(Os.FAMILY_WINDOWS)) {
executable 'cmd'
args '/C', 'call'
@ -210,21 +293,8 @@ class ClusterFormationTasks {
/** Adds a task to start an elasticsearch node with the given configuration */
static Task configureStartTask(String name, Project project, Task setup, File cwd, ClusterConfiguration config, String clusterName, File pidFile, File home) {
Map esEnv = [
'JAVA_HOME' : project.javaHome,
'ES_GC_OPTS': config.jvmArgs // we pass these with the undocumented gc opts so the argline can set gc, etc
List<String> esProps = config.systemProperties.collect { key, value -> "-D${key}=${value}" }
for (Map.Entry<String, String> property : System.properties.entrySet()) {
if (property.getKey().startsWith('es.')) {
static Task configureStartTask(String name, Project project, Task setup, NodeInfo node) {
String executable
// running with cmd on windows will look for this with the .bat extension
String esScript = new File(home, 'bin/elasticsearch').toString()
List<String> esArgs = []
if (Os.isFamily(Os.FAMILY_WINDOWS)) {
executable = 'cmd'
@ -234,15 +304,13 @@ class ClusterFormationTasks {
executable = 'sh'
File failedMarker = new File(cwd, 'run.failed')
// this closure is converted into ant nodes by groovy's AntBuilder
Closure antRunner = {
// we must add debug options inside the closure so the config is read at execution time, as
// gradle task options are not processed until the end of the configuration phase
if (config.debug) {
if (node.config.debug) {
println 'Running elasticsearch in debug mode, suspending until connected on port 8000'
esEnv['JAVA_OPTS'] = '-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=8000'
node.env['JAVA_OPTS'] = '-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=8000'
// Due to how ant exec works with the spawn option, we lose all stdout/stderr from the
@ -251,75 +319,41 @@ class ClusterFormationTasks {
// of the real elasticsearch script. This allows ant to keep the streams open with the
// dummy process, but us to have the output available if there is an error in the
// elasticsearch start script
if (config.daemonize) {
String script = node.esScript
if (node.config.daemonize) {
String scriptName = 'run'
String argsPasser = '"$@"'
String exitMarker = '; if [ $? != 0 ]; then touch run.failed; fi'
String exitMarker = "; if [ \$? != 0 ]; then touch run.failed; fi"
if (Os.isFamily(Os.FAMILY_WINDOWS)) {
scriptName += '.bat'
argsPasser = '%*'
exitMarker = '\r\n if "%errorlevel%" neq "0" ( type nul >> run.failed )'
exitMarker = "\r\n if \"%errorlevel%\" neq \"0\" ( type nul >> run.failed )"
File wrapperScript = new File(cwd, scriptName)
wrapperScript.setText("\"${esScript}\" ${argsPasser} > run.log 2>&1 ${exitMarker}", 'UTF-8')
esScript = wrapperScript.toString()
File wrapperScript = new File(node.cwd, scriptName)
wrapperScript.setText("\"${script}\" ${argsPasser} > run.log 2>&1 ${exitMarker}", 'UTF-8')
script = wrapperScript.toString()
exec(executable: executable, spawn: config.daemonize, dir: cwd, taskname: 'elasticsearch') {
esEnv.each { key, value -> env(key: key, value: value) }
arg(value: esScript)
esProps.each { arg(value: it) }
waitfor(maxwait: '30', maxwaitunit: 'second', checkevery: '500', checkeveryunit: 'millisecond', timeoutproperty: "failed${name}") {
or {
resourceexists {
file(file: failedMarker.toString())
and {
resourceexists {
file(file: pidFile.toString())
http(url: "http://localhost:${config.httpPort}")
exec(executable: executable, spawn: node.config.daemonize, dir: node.cwd, taskname: 'elasticsearch') {
node.env.each { key, value -> env(key: key, value: value) }
arg(value: script)
node.args.each { arg(value: it) }
// this closure is the actual code to run elasticsearch
Closure elasticsearchRunner = {
// Command as string for logging
String esCommandString = "Elasticsearch command: ${esScript} "
esCommandString += esProps.join(' ')
if (esEnv.isEmpty() == false) {
esCommandString += '\nenvironment:'
esEnv.each { k, v -> esCommandString += "\n ${k}: ${v}" }
node.getCommandString().eachLine { line -> logger.info(line) }
ByteArrayOutputStream buffer = new ByteArrayOutputStream()
if (logger.isInfoEnabled() || config.daemonize == false) {
if (logger.isInfoEnabled() || node.config.daemonize == false) {
// run with piping streams directly out (even stderr to stdout since gradle would capture it)
runAntCommand(project, antRunner, System.out, System.err)
} else {
// buffer the output, we may not need to print it
PrintStream captureStream = new PrintStream(buffer, true, "UTF-8")
PrintStream captureStream = new PrintStream(node.buffer, true, "UTF-8")
runAntCommand(project, antRunner, captureStream, captureStream)
if (ant.properties.containsKey("failed${name}".toString()) || failedMarker.exists()) {
if (logger.isInfoEnabled() == false) {
// We already log the command at info level. No need to do it twice.
esCommandString.eachLine { line -> logger.error(line) }
// the waitfor failed, so dump any output we got (may be empty if info logging, but that is ok)
buffer.toString('UTF-8').eachLine { line -> logger.error(line) }
// also dump the log file for the startup script (which will include ES logging output to stdout)
File startLog = new File(cwd, 'run.log')
if (startLog.exists()) {
startLog.eachLine { line -> logger.error(line) }
throw new GradleException('Failed to start elasticsearch')
Task start = project.tasks.create(name: name, type: DefaultTask, dependsOn: setup)
@ -327,12 +361,57 @@ class ClusterFormationTasks {
return start
static Task configureWaitTask(String name, Project project, List<NodeInfo> nodes, List<Task> startTasks) {
Task wait = project.tasks.create(name: name, dependsOn: startTasks)
wait.doLast {
ant.waitfor(maxwait: '30', maxwaitunit: 'second', checkevery: '500', checkeveryunit: 'millisecond', timeoutproperty: "failed${name}") {
or {
for (NodeInfo node : nodes) {
resourceexists {
file(file: node.failedMarker.toString())
and {
for (NodeInfo node : nodes) {
resourceexists {
file(file: node.pidFile.toString())
http(url: "http://localhost:${node.config.httpPort + node.nodeNum}")
boolean anyNodeFailed = false
for (NodeInfo node : nodes) {
anyNodeFailed |= node.failedMarker.exists()
if (ant.properties.containsKey("failed${name}".toString()) || anyNodeFailed) {
for (NodeInfo node : nodes) {
if (logger.isInfoEnabled() == false) {
// We already log the command at info level. No need to do it twice.
node.getCommandString().eachLine { line -> logger.error(line) }
// the waitfor failed, so dump any output we got (may be empty if info logging, but that is ok)
logger.error("Node ${node.nodeNum} ant output:")
node.buffer.toString('UTF-8').eachLine { line -> logger.error(line) }
// also dump the log file for the startup script (which will include ES logging output to stdout)
if (node.startLog.exists()) {
logger.error("Node ${node.nodeNum} log:")
node.startLog.eachLine { line -> logger.error(line) }
throw new GradleException('Failed to start elasticsearch')
return wait
/** Adds a task to check if the process with the given pidfile is actually elasticsearch */
static Task configureCheckPreviousTask(String name, Project project, Object depends, File pidFile) {
static Task configureCheckPreviousTask(String name, Project project, Object depends, NodeInfo node) {
return project.tasks.create(name: name, type: Exec, dependsOn: depends) {
onlyIf { pidFile.exists() }
onlyIf { node.pidFile.exists() }
// the pid file won't actually be read until execution time, since the read is wrapped within an inner closure of the GString
ext.pid = "${ -> pidFile.getText('UTF-8').trim()}"
ext.pid = "${ -> node.pidFile.getText('UTF-8').trim()}"
File jps
if (Os.isFamily(Os.FAMILY_WINDOWS)) {
jps = getJpsExecutableByName(project, "jps.exe")
@ -365,11 +444,11 @@ class ClusterFormationTasks {
/** Adds a task to kill an elasticsearch node with the given pidfile */
static Task configureStopTask(String name, Project project, Object depends, File pidFile) {
static Task configureStopTask(String name, Project project, Object depends, NodeInfo node) {
return project.tasks.create(name: name, type: Exec, dependsOn: depends) {
onlyIf { pidFile.exists() }
onlyIf { node.pidFile.exists() }
// the pid file won't actually be read until execution time, since the read is wrapped within an inner closure of the GString
ext.pid = "${ -> pidFile.getText('UTF-8').trim()}"
ext.pid = "${ -> node.pidFile.getText('UTF-8').trim()}"
doFirst {
logger.info("Shutting down external node with pid ${pid}")
@ -381,27 +460,18 @@ class ClusterFormationTasks {
args '-9', pid
doLast {
/** Returns the directory elasticsearch home is contained in for the given distribution */
static File homeDir(File baseDir, String distro) {
String path
switch (distro) {
case 'zip':
case 'tar':
path = "elasticsearch-${VersionProperties.elasticsearch}"
throw new InvalidUserDataException("Unknown distribution: ${distro}")
/** Returns a unique task name for this task and node configuration */
static String taskName(Task parentTask, NodeInfo node, String action) {
if (node.config.numNodes > 1) {
return "${parentTask.name}#node${node.nodeNum}.${action}"
} else {
return "${parentTask.name}#${action}"
return new File(baseDir, path)
static File pidFile(File dir) {
return new File(dir, 'es.pid')
/** Runs an ant command, sending output to the given out and error streams */
Normal file
Normal file
@ -0,0 +1,8 @@
apply plugin: 'elasticsearch.rest-test'
integTest {
cluster {
numNodes = 2
@ -1,44 +0,0 @@
<?xml version="1.0"?>
<project name="smoke-test-multinode"
<import file="${elasticsearch.integ.antfile.default}"/>
<property name="integ.pidfile.sec" location="${integ.scratch}/es-secondary.pid"/>
<available property="integ.pidfile.sec.exists" file="${integ.pidfile.sec}"/>
<target name="stop-secondary-node" if="integ.pidfile.sec.exists">
<stop-node es.pidfile="${integ.pidfile.sec}"/>
<target name="stop-primary-node" if="integ.pidfile.exists">
<stop-node es.pidfile="${integ.pidfile}"/>
<target name="stop-external-multi-node-cluster" depends="stop-primary-node, stop-secondary-node"/>
<target name="start-external-multi-node-no-plugins" depends="stop-secondary-node, setup-workspace" unless="${shouldskip}">
<start-unicast-node es.peer.list=""/>
<ac:trycatch property="failure.message">
<start-unicast-node es.http.port="9600" es.transport.port="9700"
<echo>Failed to start second node with message: ${failure.message}</echo>
<stop-node es.pidfile="${integ.pidfile}"/>
<local name="failed.to.form.cluster"/>
<waitfor-two-nodes port="${integ.http.port}"
<fail message="Instances did not form a cluster" if="failed.to.form.cluster"/>
<stop-node es.pidfile="${integ.pidfile}"/>
<stop-node es.pidfile="${integ.pidfile.sec}"/>
@ -1,284 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
This test unzips elasticsearch, installs each plugin,
starts 2 elasticsearch nodes, verifies that they form a cluster.
<name>QA: Smoke Test Multi-Node IT</name>
<description>Tests that multi node IT tests work</description>
<!-- Provided dependencies by elasticsearch itself -->
<!-- Required by the REST test framework -->
<!-- TODO: remove this dependency when we will have a REST Test module -->
<!-- elasticsearch distribution -->
<!-- integration tests -->
<!-- start up external cluster -->
<ant antfile="${elasticsearch.integ.antfile}" target="start-external-multi-node-no-plugins">
<property name="tests.jvm.argline" value="${tests.jvm.argline}"/>
<property name="integ.multi.node" value="true"/>
<!-- shut down external cluster -->
<ant antfile="${elasticsearch.integ.antfile}" target="stop-external-multi-node-cluster"/>
@ -32,6 +32,7 @@ List projects = [
