Build: Improve integ test to match ant behavior

Many other improvements:
* Use spaces in ES path
* Use space in path for plugin file installation
* Use a different cwd than ES home
* Use jps to ensure process being stopped is actually elasticsearch
* Stop ES if pid file already exists
* Delete pid file when successfully killed

Also, refactored the cluster formation code to be a little more organized.

closes #14464
This commit is contained in:
Ryan Ernst 2015-11-06 23:20:40 -08:00
parent f03196193f
commit 9ee315a9c8
4 changed files with 248 additions and 169 deletions

View File

@ -49,7 +49,7 @@ class PluginBuildPlugin extends BuildPlugin {
project.integTest.configure {
dependsOn project.bundlePlugin
cluster {
plugin 'installPlugin', project.bundlePlugin.outputs.files
plugin project.name, project.bundlePlugin.outputs.files
}
}
}

View File

@ -41,25 +41,18 @@ class ClusterConfiguration {
Map<String, String> systemProperties = new HashMap<>()
@Input
void systemProperty(String property, String value) {
systemProperties.put(property, value)
}
LinkedHashMap<String, FileCollection> plugins = new LinkedHashMap<>()
LinkedHashMap<String, Object[]> setupCommands = new LinkedHashMap<>()
@Input
void plugin(String name, FileCollection file) {
setupCommands.put(name, ['bin/plugin', 'install', new LazyFileUri(file: file)])
plugins.put(name, file)
}
static class LazyFileUri {
FileCollection file
@Override
String toString() {
return file.singleFile.toURI().toURL().toString();
}
@Input
void systemProperty(String property, String value) {
systemProperties.put(property, value)
}
@Input

View File

@ -25,6 +25,7 @@ import org.gradle.api.GradleException
import org.gradle.api.InvalidUserDataException
import org.gradle.api.Project
import org.gradle.api.Task
import org.gradle.api.file.FileCollection
import org.gradle.api.tasks.Copy
import org.gradle.api.tasks.Delete
import org.gradle.api.tasks.Exec
@ -35,140 +36,91 @@ import org.gradle.api.tasks.Exec
class ClusterFormationTasks {
/**
* Adds dependent tasks to the given task to start a cluster with the given configuration.
* Also adds a finalize task to stop the cluster.
* Adds dependent tasks to the given task to start and stop a cluster with the given configuration.
*/
static void setup(Project project, Task task, ClusterConfiguration config) {
if (task.getEnabled() == false) {
// no need to cluster formation if the task won't run!
// no need to add cluster formation tasks if the task won't run!
return
}
configureDistributionDependency(project, config.distribution)
File clusterDir = new File(project.buildDir, 'cluster' + File.separator + task.name)
if (config.numNodes == 1) {
addNodeStartupTasks(project, task, config, clusterDir)
addNodeStopTask(project, task, clusterDir)
} else {
for (int i = 0; i < config.numNodes; ++i) {
File nodeDir = new File(clusterDir, "node${i}")
addNodeStartupTasks(project, task, config, nodeDir)
addNodeStopTask(project, task, nodeDir)
}
for (int i = 0; i < config.numNodes; ++i) {
File nodeDir = new File(project.buildDir, "cluster/${task.name} node${i}")
configureTasks(project, task, config, nodeDir)
}
}
static void addNodeStartupTasks(Project project, Task task, ClusterConfiguration config, File baseDir) {
File pidFile = pidFile(baseDir)
/** Adds a dependency on the given distribution */
static void configureDistributionDependency(Project project, String distro) {
String elasticsearchVersion = ElasticsearchProperties.version
String packaging = distro == 'tar' ? 'tar.gz' : distro
project.configurations {
elasticsearchDistro
}
project.dependencies {
elasticsearchDistro "org.elasticsearch.distribution.${distro}:elasticsearch:${elasticsearchVersion}@${packaging}"
}
}
/**
* Adds dependent tasks to start an elasticsearch cluster before the given task is executed,
* and stop it after it has finished executing.
*
* The setup of the cluster involves the following:
* <ol>
* <li>Cleanup the extraction directory</li>
* <li>Extract a fresh copy of elasticsearch</li>
* <li>Write an elasticsearch.yml config file</li>
* <li>Copy plugins that will be installed to a temporary dir (which contains spaces)</li>
* <li>Install plugins</li>
* <li>Run additional setup commands</li>
* <li>Start elasticsearch<li>
* </ol>
*/
static void configureTasks(Project project, Task task, ClusterConfiguration config, File baseDir) {
String clusterName = "${task.path.replace(':', '_').substring(1)}"
File pidFile = pidFile(baseDir)
File home = homeDir(baseDir, config.distribution)
Map esConfig = [
'cluster.name': clusterName,
'http.port': config.httpPort,
'transport.tcp.port': config.transportPort,
'pidfile': pidFile,
// TODO: make this work for multi node!
'discovery.zen.ping.unicast.hosts': "localhost:${config.transportPort}",
'path.repo': "${home}/repo",
'path.shared_data': "${home}/../",
// Define a node attribute so we can test that it exists
'node.testattr': 'test',
'repositories.url.allowed_urls': 'http://snapshot.test*'
]
Map esEnv = [
'JAVA_HOME': System.getProperty('java.home'),
'ES_GC_OPTS': config.jvmArgs
]
File cwd = new File(baseDir, "cwd")
File pluginsTmpDir = new File(baseDir, "plugins tmp")
List setupDeps = [] // need to copy the deps, since start will later be added, which would create a circular task dep!
setupDeps.addAll(task.dependsOn)
Task setup = project.tasks.create(name: "${task.name}#clean", type: Delete, dependsOn: setupDeps) {
delete baseDir
}
setup = configureExtractTask(project, "${task.name}#extract", config.distribution, baseDir, setup)
// chain setup tasks to maintain their order
setup = project.tasks.create(name: "${task.name}#configure", type: DefaultTask, dependsOn: setup) << {
File configFile = new File(home, 'config/elasticsearch.yml')
logger.info("Configuring ${configFile}")
configFile.setText(esConfig.collect { key, value -> "${key}: ${value}" }.join('\n'), 'UTF-8')
}
for (Map.Entry<String, String> command : config.setupCommands.entrySet()) {
Task nextSetup = project.tasks.create(name: "${task.name}#${command.getKey()}", type: Exec, dependsOn: setup) {
workingDir home
if (Os.isFamily(Os.FAMILY_WINDOWS)) {
executable 'cmd'
args '/C', 'call'
} else {
executable 'sh'
}
args command.getValue()
// only show output on failure, when not in info or debug mode
if (logger.isInfoEnabled() == false) {
standardOutput = new ByteArrayOutputStream()
errorOutput = standardOutput
ignoreExitValue = true
doLast {
if (execResult.exitValue != 0) {
logger.error(standardOutput.toString())
throw new GradleException("Process '${command.getValue().join(' ')}' finished with non-zero exit value ${execResult.exitValue}")
}
}
}
// tasks are chained so their execution order is maintained
Task setup = project.tasks.create(name: "${task.name}#clean", type: Delete, dependsOn: task.dependsOn.collect()) {
delete home
doLast {
cwd.mkdirs()
}
setup = nextSetup
}
setup = configureCheckPreviousTask("${task.name}#checkPrevious", project, setup, pidFile)
setup = configureStopTask("${task.name}#stopPrevious", project, setup, pidFile)
setup = configureExtractTask("${task.name}#extract", project, setup, baseDir, config.distribution)
setup = configureWriteConfigTask("${task.name}#configure", project, setup, home, config, clusterName, pidFile)
setup = configureCopyPluginsTask("${task.name}#copyPlugins", project, setup, pluginsTmpDir, config)
// install plugins
for (Map.Entry<String, FileCollection> plugin : config.plugins.entrySet()) {
String camelName = plugin.getKey().replaceAll(/-(\w)/) { _, c -> c.toUpperCase() }
String taskName = "${task.name}#install${camelName.capitalize()}"
// delay reading the file location until execution time
String file = "${ -> new File(pluginsTmpDir, plugin.getValue().singleFile.getName()).toURI().toURL().toString() }"
Object[] args = [new File(home, 'bin/plugin'), 'install', file]
setup = configureExecTask(taskName, project, setup, cwd, args)
}
List esArgs = config.systemProperties.collect {key, value -> "-D${key}=${value}"}
Closure esPostStartActions = { ant, logger ->
ant.waitfor(maxwait: '30', maxwaitunit: 'second', checkevery: '500', checkeveryunit: 'millisecond', timeoutproperty: "failed${task.name}#start") {
and {
resourceexists {
file file: pidFile.toString()
}
http(url: "http://localhost:${config.httpPort}")
}
}
if (ant.properties.containsKey("failed${task.name}#start".toString())) {
new File(home, 'logs' + File.separator + clusterName + '.log').eachLine {
line -> logger.error(line)
}
throw new GradleException('Failed to start elasticsearch')
}
}
Task start
if (Os.isFamily(Os.FAMILY_WINDOWS)) {
// elasticsearch.bat is spawned as it has no daemon mode
start = project.tasks.create(name: "${task.name}#start", type: DefaultTask, dependsOn: setup) << {
// Fall back to Ant exec task as Gradle Exec task does not support spawning yet
ant.exec(executable: 'cmd', spawn: true, dir: home) {
esEnv.each { key, value -> env(key: key, value: value) }
(['/C', 'call', 'bin/elasticsearch'] + esArgs).each { arg(value: it) }
}
esPostStartActions(ant, logger)
}
} else {
start = project.tasks.create(name: "${task.name}#start", type: Exec, dependsOn: setup) {
workingDir home
executable 'sh'
args 'bin/elasticsearch', '-d' // daemonize!
args esArgs
environment esEnv
errorOutput = new ByteArrayOutputStream()
doLast {
if (errorOutput.toString().isEmpty() == false) {
logger.error(errorOutput.toString())
new File(home, 'logs' + File.separator + clusterName + '.log').eachLine {
line -> logger.error(line)
}
throw new GradleException('Failed to start elasticsearch')
}
esPostStartActions(ant, logger)
}
}
// extra setup commands
for (Map.Entry<String, Object[]> command : config.setupCommands.entrySet()) {
setup = configureExecTask("${task.name}#${command.getKey()}", project, setup, cwd, command.getValue())
}
Task start = configureStartTask("${task.name}#start", project, setup, cwd, config, clusterName, pidFile, home)
task.dependsOn(start)
Task stop = configureStopTask("${task.name}#stop", project, [], pidFile)
task.finalizedBy(stop)
}
static Task configureExtractTask(Project project, String name, String distro, File baseDir, Task setup) {
/** Adds a task to extract the elasticsearch distribution */
static Task configureExtractTask(String name, Project project, Task setup, File baseDir, String distro) {
List extractDependsOn = [project.configurations.elasticsearchDistro, setup]
Task extract
switch (distro) {
@ -180,7 +132,9 @@ class ClusterFormationTasks {
break;
case 'tar':
extract = project.tasks.create(name: name, type: Copy, dependsOn: extractDependsOn) {
from { project.tarTree(project.resources.gzip(project.configurations.elasticsearchDistro.singleFile)) }
from {
project.tarTree(project.resources.gzip(project.configurations.elasticsearchDistro.singleFile))
}
into baseDir
}
break;
@ -190,6 +144,176 @@ class ClusterFormationTasks {
return extract
}
/** Adds a task to write elasticsearch.yml for the given node configuration */
static Task configureWriteConfigTask(String name, Project project, Task setup, File home, ClusterConfiguration config, String clusterName, File pidFile) {
Map esConfig = [
'cluster.name' : clusterName,
'http.port' : config.httpPort,
'transport.tcp.port' : config.transportPort,
'pidfile' : pidFile,
// TODO: make this work for multi node!
'discovery.zen.ping.unicast.hosts': "localhost:${config.transportPort}",
'path.repo' : "${home}/repo",
'path.shared_data' : "${home}/../",
// Define a node attribute so we can test that it exists
'node.testattr' : 'test',
'repositories.url.allowed_urls' : 'http://snapshot.test*'
]
return project.tasks.create(name: name, type: DefaultTask, dependsOn: setup) << {
File configFile = new File(home, 'config/elasticsearch.yml')
logger.info("Configuring ${configFile}")
configFile.setText(esConfig.collect { key, value -> "${key}: ${value}" }.join('\n'), 'UTF-8')
}
}
/** Adds a task to copy plugins to a temp dir, which they will later be installed from. */
static Task configureCopyPluginsTask(String name, Project project, Task setup, File pluginsTmpDir, ClusterConfiguration config) {
if (config.plugins.isEmpty()) {
return setup
}
return project.tasks.create(name: name, type: Copy, dependsOn: setup) {
into pluginsTmpDir
from(*config.plugins.values().collect { plugin -> return { plugin.singleFile } })
}
}
/** Adds a task to execute a command to help setup the cluster */
static Task configureExecTask(String name, Project project, Task setup, File cwd, Object[] execArgs) {
return project.tasks.create(name: name, type: Exec, dependsOn: setup) {
workingDir cwd
if (Os.isFamily(Os.FAMILY_WINDOWS)) {
executable 'cmd'
args '/C', 'call'
} else {
executable 'sh'
}
args execArgs
// only show output on failure, when not in info or debug mode
if (logger.isInfoEnabled() == false) {
standardOutput = new ByteArrayOutputStream()
errorOutput = standardOutput
ignoreExitValue = true
doLast {
if (execResult.exitValue != 0) {
logger.error(standardOutput.toString())
throw new GradleException("Process '${execArgs.join(' ')}' finished with non-zero exit value ${execResult.exitValue}")
}
}
}
}
}
/** Adds a task to start an elasticsearch node with the given configuration */
static Task configureStartTask(String name, Project project, Task setup, File cwd, ClusterConfiguration config, String clusterName, File pidFile, File home) {
Map esEnv = [
'JAVA_HOME' : System.getProperty('java.home'),
'ES_GC_OPTS': config.jvmArgs
]
List esProps = config.systemProperties.collect { key, value -> "-D${key}=${value}" }
for (Map.Entry<String, String> property : System.properties.entrySet()) {
if (property.getKey().startsWith('es.')) {
esProps.add("-D${property.getKey()}=${property.getValue()}")
}
}
Closure esPostStartActions = { ant, logger ->
ant.waitfor(maxwait: '30', maxwaitunit: 'second', checkevery: '500', checkeveryunit: 'millisecond', timeoutproperty: "failed${name.capitalize()}") {
and {
resourceexists {
file file: pidFile.toString()
}
http(url: "http://localhost:${config.httpPort}")
}
}
if (ant.properties.containsKey("failed${name}".toString())) {
File logFile = new File(home, "logs/${clusterName}.log")
if (logFile.exists()) {
logFile.eachLine { line -> logger.error(line) }
}
throw new GradleException('Failed to start elasticsearch')
}
}
File esScript = new File(home, 'bin/elasticsearch')
if (Os.isFamily(Os.FAMILY_WINDOWS)) {
// elasticsearch.bat is spawned as it has no daemon mode
return project.tasks.create(name: name, type: DefaultTask, dependsOn: setup) << {
// Fall back to Ant exec task as Gradle Exec task does not support spawning yet
ant.exec(executable: 'cmd', spawn: true, dir: cwd) {
esEnv.each { key, value -> env(key: key, value: value) }
(['/C', 'call', esScript] + esProps).each { arg(value: it) }
}
esPostStartActions(ant, logger)
}
} else {
return project.tasks.create(name: name, type: Exec, dependsOn: setup) {
workingDir cwd
executable 'sh'
args esScript, '-d' // daemonize!
args esProps
environment esEnv
errorOutput = new ByteArrayOutputStream()
doLast {
if (errorOutput.toString().isEmpty() == false) {
logger.error(errorOutput.toString())
File logFile = new File(home, "logs/${clusterName}.log")
if (logFile.exists()) {
logFile.eachLine { line -> logger.error(line) }
}
throw new GradleException('Failed to start elasticsearch')
}
esPostStartActions(ant, logger)
}
}
}
}
/** Adds a task to check if the process with the given pidfile is actually elasticsearch */
static Task configureCheckPreviousTask(String name, Project project, Object depends, File pidFile) {
return project.tasks.create(name: name, type: Exec, dependsOn: depends) {
onlyIf { pidFile.exists() }
ext.pid = "${ -> pidFile.getText('UTF-8').trim()}"
commandLine new File(System.getenv('JAVA_HOME'), 'bin/jps'), '-l'
standardOutput = new ByteArrayOutputStream()
doLast {
String out = standardOutput.toString()
if (out.contains("${pid} org.elasticsearch.bootstrap.Elasticsearch") == false) {
logger.error('jps -l')
logger.error(out)
logger.error("pid file: ${pidFile}")
logger.error("pid: ${pid}")
throw new GradleException("jps -l did not report any process with org.elasticsearch.bootstrap.Elasticsearch\n" +
"Did you run gradle clean? Maybe an old pid file is still lying around.")
} else {
logger.info(out)
}
}
}
}
/** Adds a task to kill an elasticsearch node with the given pidfile */
static Task configureStopTask(String name, Project project, Object depends, File pidFile) {
return project.tasks.create(name: name, type: Exec, dependsOn: depends) {
onlyIf { pidFile.exists() }
ext.pid = "${ -> pidFile.getText('UTF-8').trim()}"
doFirst {
logger.info("Shutting down external node with pid ${pid}")
}
if (Os.isFamily(Os.FAMILY_WINDOWS)) {
executable 'Taskkill'
args '/PID', pid, '/F'
} else {
executable 'kill'
args '-9', pid
}
doLast {
project.delete(pidFile)
}
}
}
/** Returns the directory elasticsearch home is contained in for the given distribution */
static File homeDir(File baseDir, String distro) {
String path
switch (distro) {
@ -203,44 +327,7 @@ class ClusterFormationTasks {
return new File(baseDir, path)
}
static void addNodeStopTask(Project project, Task task, File baseDir) {
LazyPidReader pidFile = new LazyPidReader(pidFile: pidFile(baseDir))
Task stop = project.tasks.create(name: "${task.name}#stop", type: Exec) {
if (Os.isFamily(Os.FAMILY_WINDOWS)) {
executable 'Taskkill'
args '/PID', pidFile, '/F'
} else {
executable 'kill'
args '-9', pidFile
}
doLast {
// TODO: wait for pid to close, or kill -9 and fail
}
}
task.finalizedBy(stop)
}
/** Delays reading a pid file until needing to use the pid */
static class LazyPidReader {
File pidFile
@Override
String toString() {
return pidFile.text.stripMargin()
}
}
static File pidFile(File dir) {
return new File(dir, 'es.pid')
}
static void configureDistributionDependency(Project project, String distro) {
String elasticsearchVersion = ElasticsearchProperties.version
String packaging = distro == 'tar' ? 'tar.gz' : distro
project.configurations {
elasticsearchDistro
}
project.dependencies {
elasticsearchDistro "org.elasticsearch.distribution.${distro}:elasticsearch:${elasticsearchVersion}@${packaging}"
}
}
}

View File

@ -26,10 +26,9 @@ for (Project subproj : project.rootProject.subprojects) {
if (subproj.path.startsWith(':plugins:')) {
integTest {
def bundlePlugin = subproj.tasks.findByName('bundlePlugin')
String camelName = subproj.name.replaceAll(/-(\w)/) { _, c -> c.toUpperCase() }
dependsOn bundlePlugin
cluster {
plugin "install${camelName.capitalize()}", bundlePlugin.outputs.files
plugin subproj.name, bundlePlugin.outputs.files
}
}
pluginCount += 1