Use reaper process instead of shutdown hooks for testclusters (#44927)

Testclusters currently provides protection from clusters living past the
life of a build by adding a shutdown hook to java. While this works in
some cases, it does not cover all cases like where the daemon is killed
with SIGKILL.

To handle these other cases, this commit replaces the shutdown hooks with
a separate process (one per build) that manages reaping external services
if gradle dies.
This commit is contained in:
Ryan Ernst 2019-08-02 18:58:04 -07:00 committed by GitHub
parent 5dc22e29c6
commit 925bd4faa1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 357 additions and 147 deletions

View File

@ -71,6 +71,15 @@ sourceSets {
minimumRuntime { }
}
allprojects {
targetCompatibility = 11
sourceCompatibility = 11
}
configurations {
reaper
}
compileMinimumRuntimeJava {
targetCompatibility = 8
sourceCompatibility = 8
@ -78,6 +87,9 @@ compileMinimumRuntimeJava {
jar {
from sourceSets.minimumRuntime.output
into('META-INF') {
from configurations.reaper
}
}
javadoc {
@ -115,6 +127,7 @@ dependencies {
testCompile "junit:junit:${props.getProperty('junit')}"
testCompile "com.carrotsearch.randomizedtesting:randomizedtesting-runner:${props.getProperty('randomizedrunner')}"
testCompile 'com.github.tomakehurst:wiremock-jre8-standalone:2.23.2'
reaper project('reaper')
minimumRuntimeCompile "junit:junit:${props.getProperty('junit')}"
minimumRuntimeCompile localGroovy()
minimumRuntimeCompile gradleApi()
@ -145,8 +158,10 @@ if (project != rootProject) {
apply plugin: 'nebula.maven-base-publish'
apply plugin: 'nebula.maven-scm'
targetCompatibility = '10'
sourceCompatibility = '10'
allprojects {
targetCompatibility = 11
sourceCompatibility = 11
}
// groovydoc succeeds, but has some weird internal exception...
groovydoc.enabled = false

View File

@ -0,0 +1,7 @@
apply plugin: 'java'
jar {
manifest {
attributes 'Main-Class': 'org.elasticsearch.gradle.reaper.Reaper'
}
}

View File

@ -0,0 +1,128 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.gradle.reaper;
import java.io.Closeable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* A standalone process that will reap external services after a build dies.
*
* <h2>Input</h2>
* Since how to reap a given service is platform and service dependent, this tool
* operates on system commands to execute. It takes a single argument, a directory
* that will contain files with reaping commands. Each line in each file will be
* executed with {@link Runtime#getRuntime()#exec}.
*
* The main method will wait indefinitely on the parent process (Gradle) by
* reading from stdin. When Gradle shuts down, whether normally or abruptly, the
* pipe will be broken and read will return.
*
* The reaper will then iterate over the files in the configured directory,
* and execute the given commands. If any commands fail, a failure message is
* written to stderr. Otherwise, the input file will be deleted. If no inputs
* produced errors, the entire input directory is deleted upon completion of reaping.
*/
public class Reaper implements Closeable {
private Path inputDir;
private boolean failed;
private Reaper(Path inputDir) {
this.inputDir = inputDir;
this.failed = false;
}
public static void main(String[] args) throws Exception {
if (args.length != 1) {
System.err.println("Expected one argument.\nUsage: java -jar reaper.jar <DIR_OF_REAPING_COMMANDS>");
System.exit(1);
}
Path inputDir = Paths.get(args[0]);
try (Reaper reaper = new Reaper(inputDir)){
System.in.read();
reaper.reap();
}
}
private void reap() {
try (Stream<Path> stream = Files.list(inputDir)){
final List<Path> inputFiles = stream.filter(p -> p.getFileName().toString().endsWith(".cmd")).collect(Collectors.toList());
for (Path inputFile : inputFiles) {
System.out.println("Process file: " + inputFile);
String line = Files.readString(inputFile);
System.out.println("Running command: " + line);
String[] command = line.split(" ");
Process process = Runtime.getRuntime().exec(command);
int ret = process.waitFor();
System.out.print("Stdout: ");
process.getInputStream().transferTo(System.out);
System.out.print("\nStderr: ");
process.getErrorStream().transferTo(System.out);
System.out.println(); // end the stream
if (ret != 0) {
logFailure("Command [" + line + "] failed with exit code " + ret, null);
} else {
delete(inputFile);
}
}
} catch (Exception e) {
logFailure("Failed to reap inputs", e);
}
}
private void logFailure(String message, Exception e) {
System.err.println(message);
if (e != null) {
e.printStackTrace(System.err);
}
failed = true;
}
private void delete(Path toDelete) {
try {
Files.delete(toDelete);
} catch (IOException e) {
logFailure("Failed to delete [" + toDelete + "]", e);
}
}
@Override
public void close() {
if (failed == false) {
try (Stream<Path> stream = Files.walk(inputDir)){
stream.sorted(Comparator.reverseOrder()).forEach(this::delete);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}
}

1
buildSrc/settings.gradle Normal file
View File

@ -0,0 +1 @@
include 'reaper'

View File

@ -0,0 +1,45 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.gradle;
import org.gradle.api.Plugin;
import org.gradle.api.Project;
import java.nio.file.Path;
/**
* A plugin to handle reaping external services spawned by a build if Gradle dies.
*/
public class ReaperPlugin implements Plugin<Project> {
@Override
public void apply(Project project) {
if (project != project.getRootProject()) {
throw new IllegalArgumentException("ReaperPlugin can only be applied to the root project of a build");
}
Path inputDir = project.getRootDir().toPath().resolve(".gradle")
.resolve("reaper").resolve("build-" + ProcessHandle.current().pid());
ReaperService service = project.getExtensions().create("reaper", ReaperService.class,
project.getLogger(), project.getBuildDir().toPath(), inputDir);
project.getGradle().buildFinished(result -> service.shutdown());
}
}

View File

@ -0,0 +1,136 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.gradle;
import org.gradle.api.GradleException;
import org.gradle.api.logging.Logger;
import org.gradle.internal.jvm.Jvm;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
public class ReaperService {
private Logger logger;
private Path buildDir;
private Path inputDir;
private Path logFile;
private volatile Process reaperProcess;
public ReaperService(Logger logger, Path buildDir, Path inputDir) {
this.logger = logger;
this.buildDir = buildDir;
this.inputDir = inputDir;
this.logFile = inputDir.resolve("reaper.log");
}
/**
* Register a pid that will be killed by the reaper.
*/
public void registerPid(String serviceId, long pid) {
String[] killPidCommand = OS.<String[]>conditional()
.onWindows(() -> new String[]{"Taskill", "/F", "/PID", String.valueOf(pid)})
.onUnix(() -> new String[]{"kill", "-9", String.valueOf(pid)})
.supply();
registerCommand(serviceId, killPidCommand);
}
/**
* Register a system command that will be run by the reaper.
*/
public void registerCommand(String serviceId, String... command) {
ensureReaperStarted();
try {
Files.writeString(inputDir.resolve(serviceId + ".cmd"), String.join(" ", command));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
public void unregister(String serviceId) {
try {
Files.deleteIfExists(inputDir.resolve(serviceId + ".cmd"));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
void shutdown() {
if (reaperProcess != null) {
ensureReaperAlive();
try {
reaperProcess.getOutputStream().close();
logger.info("Waiting for reaper to exit normally");
if (reaperProcess.waitFor() != 0) {
throw new GradleException("Reaper process failed. Check log at "
+ inputDir.resolve("error.log") + " for details");
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
private synchronized void ensureReaperStarted() {
if (reaperProcess == null) {
try {
// copy the reaper jar
Path jarPath = buildDir.resolve("reaper").resolve("reaper.jar");
Files.createDirectories(jarPath.getParent());
InputStream jarInput = ReaperPlugin.class.getResourceAsStream("/META-INF/reaper.jar");
try (OutputStream out = Files.newOutputStream(jarPath)) {
jarInput.transferTo(out);
}
// ensure the input directory exists
Files.createDirectories(inputDir);
// start the reaper
ProcessBuilder builder = new ProcessBuilder(
Jvm.current().getJavaExecutable().toString(), // same jvm as gradle
"-Xms4m", "-Xmx16m", // no need for a big heap, just need to read some files and execute
"-jar", jarPath.toString(),
inputDir.toString());
logger.info("Launching reaper: " + String.join(" ", builder.command()));
// be explicit for stdin, we use closing of the pipe to signal shutdown to the reaper
builder.redirectInput(ProcessBuilder.Redirect.PIPE);
builder.redirectOutput(logFile.toFile());
builder.redirectError(logFile.toFile());
reaperProcess = builder.start();
} catch (Exception e) {
throw new RuntimeException(e);
}
} else {
ensureReaperAlive();
}
}
private void ensureReaperAlive() {
if (reaperProcess.isAlive() == false) {
throw new IllegalStateException("Reaper process died unexpectedly! Check the log at " + logFile.toString());
}
}
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.gradle.testclusters;
import org.elasticsearch.gradle.ElasticsearchDistribution;
import org.elasticsearch.gradle.FileSupplier;
import org.elasticsearch.gradle.PropertyNormalization;
import org.elasticsearch.gradle.ReaperService;
import org.elasticsearch.gradle.http.WaitForHttpResource;
import org.gradle.api.Named;
import org.gradle.api.NamedDomainObjectContainer;
@ -61,19 +62,21 @@ public class ElasticsearchCluster implements TestClusterConfiguration, Named {
private final Function<Integer, ElasticsearchDistribution> distributionFactory;
private final LinkedHashMap<String, Predicate<TestClusterConfiguration>> waitConditions = new LinkedHashMap<>();
private final Project project;
private final ReaperService reaper;
public ElasticsearchCluster(String path, String clusterName, Project project,
public ElasticsearchCluster(String path, String clusterName, Project project, ReaperService reaper,
Function<Integer, ElasticsearchDistribution> distributionFactory, File workingDirBase) {
this.path = path;
this.clusterName = clusterName;
this.project = project;
this.reaper = reaper;
this.distributionFactory = distributionFactory;
this.workingDirBase = workingDirBase;
this.nodes = project.container(ElasticsearchNode.class);
this.nodes.add(
new ElasticsearchNode(
path, clusterName + "-0",
project, workingDirBase, distributionFactory.apply(0)
project, reaper, workingDirBase, distributionFactory.apply(0)
)
);
// configure the cluster name eagerly so nodes know about it
@ -97,7 +100,7 @@ public class ElasticsearchCluster implements TestClusterConfiguration, Named {
for (int i = nodes.size() ; i < numberOfNodes; i++) {
this.nodes.add(new ElasticsearchNode(
path, clusterName + "-" + i, project, workingDirBase, distributionFactory.apply(i)
path, clusterName + "-" + i, project, reaper, workingDirBase, distributionFactory.apply(i)
));
}
}

View File

@ -25,6 +25,7 @@ import org.elasticsearch.gradle.LazyPropertyMap;
import org.elasticsearch.gradle.LoggedExec;
import org.elasticsearch.gradle.OS;
import org.elasticsearch.gradle.PropertyNormalization;
import org.elasticsearch.gradle.ReaperService;
import org.elasticsearch.gradle.Version;
import org.elasticsearch.gradle.VersionProperties;
import org.elasticsearch.gradle.http.WaitForHttpResource;
@ -108,6 +109,7 @@ public class ElasticsearchNode implements TestClusterConfiguration {
private final String path;
private final String name;
private final Project project;
private final ReaperService reaper;
private final AtomicBoolean configurationFrozen = new AtomicBoolean(false);
private final Path workingDir;
@ -142,11 +144,12 @@ public class ElasticsearchNode implements TestClusterConfiguration {
private Function<String, String> nameCustomization = Function.identity();
private boolean isWorkingDirConfigured = false;
ElasticsearchNode(String path, String name, Project project, File workingDirBase,
ElasticsearchNode(String path, String name, Project project, ReaperService reaper, File workingDirBase,
ElasticsearchDistribution distribution) {
this.path = path;
this.name = name;
this.project = project;
this.reaper = reaper;
this.workingDir = workingDirBase.toPath().resolve(safeName(name)).toAbsolutePath();
this.distribution = distribution;
confPathRepo = workingDir.resolve("repo");
@ -632,6 +635,7 @@ public class ElasticsearchNode implements TestClusterConfiguration {
} catch (IOException e) {
throw new TestClustersException("Failed to start ES process for " + this, e);
}
reaper.registerPid(toString(), esProcess.pid());
}
@Override
@ -717,6 +721,8 @@ public class ElasticsearchNode implements TestClusterConfiguration {
if (processHandle.isAlive()) {
throw new TestClustersException("Was not able to terminate elasticsearch process for " + this);
}
reaper.unregister(toString());
}
private void logProcessInfo(String prefix, ProcessHandle.Info info) {

View File

@ -1,57 +0,0 @@
package org.elasticsearch.gradle.testclusters;
import org.gradle.api.logging.Logger;
import org.gradle.api.logging.Logging;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
/**
* Keep an inventory of all running Clusters and stop them when interrupted
*
* This takes advantage of the fact that Gradle interrupts all the threads in the daemon when the build completes.
*/
public class TestClusterCleanupOnShutdown implements Runnable {
private final Logger logger = Logging.getLogger(TestClusterCleanupOnShutdown.class);
private Set<ElasticsearchCluster> clustersToWatch = new HashSet<>();
public synchronized void watch(Collection<ElasticsearchCluster> clusters) {
clustersToWatch.addAll(clusters);
}
public synchronized void unWatch(Collection<ElasticsearchCluster> clusters) {
clustersToWatch.removeAll(clusters);
}
@Override
public void run() {
try {
while (true) {
Thread.sleep(Long.MAX_VALUE);
}
} catch (InterruptedException interrupted) {
shutdownClusters();
}
}
public synchronized void shutdownClusters() {
if (clustersToWatch.isEmpty()) {
return;
}
logger.info("Cleanup thread was interrupted, shutting down all clusters");
Iterator<ElasticsearchCluster> iterator = clustersToWatch.iterator();
while (iterator.hasNext()) {
ElasticsearchCluster cluster = iterator.next();
iterator.remove();
try {
cluster.stop(false);
} catch (Exception e) {
logger.warn("Could not shut down {}", cluster, e);
}
}
}
}

View File

@ -1,73 +0,0 @@
package org.elasticsearch.gradle.testclusters;
import org.gradle.api.Project;
import org.gradle.api.logging.Logger;
import org.gradle.api.logging.Logging;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* This extensions was meant to be used internally by testclusters
*
* It holds synchronization primitives needed to implement the rate limiting.
* This is tricky because we can't use Gradle workers as there's no way to make sure that tests and their clusters are
* allocated atomically, so we could be in a situation where all workers are tests waiting for clusters to start up.
*
* Also auto configures cleanup of executors to make sure we don't leak threads in the daemon.
*/
public class TestClustersCleanupExtension {
private static final int EXECUTOR_SHUTDOWN_TIMEOUT = 1;
private static final TimeUnit EXECUTOR_SHUTDOWN_TIMEOUT_UNIT = TimeUnit.MINUTES;
private static final Logger logger = Logging.getLogger(TestClustersCleanupExtension.class);
private final ExecutorService executorService;
private final TestClusterCleanupOnShutdown cleanupThread;
public TestClustersCleanupExtension() {
executorService = Executors.newSingleThreadExecutor();
cleanupThread = new TestClusterCleanupOnShutdown();
executorService.submit(cleanupThread);
}
public static void createExtension(Project project) {
if (project.getRootProject().getExtensions().findByType(TestClustersCleanupExtension.class) != null) {
return;
}
// Configure the extension on the root project so we have a single instance per run
TestClustersCleanupExtension ext = project.getRootProject().getExtensions().create(
"__testclusters_rate_limit",
TestClustersCleanupExtension.class
);
Thread shutdownHook = new Thread(ext.cleanupThread::shutdownClusters);
Runtime.getRuntime().addShutdownHook(shutdownHook);
project.getGradle().buildFinished(buildResult -> {
ext.executorService.shutdownNow();
try {
if (ext.executorService.awaitTermination(EXECUTOR_SHUTDOWN_TIMEOUT, EXECUTOR_SHUTDOWN_TIMEOUT_UNIT) == false) {
throw new IllegalStateException(
"Failed to shut down executor service after " +
EXECUTOR_SHUTDOWN_TIMEOUT + " " + EXECUTOR_SHUTDOWN_TIMEOUT_UNIT
);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
try {
if (false == Runtime.getRuntime().removeShutdownHook(shutdownHook)) {
logger.warn("Trying to deregister shutdown hook when it was not registered.");
}
} catch (IllegalStateException ese) {
// Thrown when shutdown is in progress
logger.warn("Can't remove shutdown hook", ese);
}
});
}
public TestClusterCleanupOnShutdown getCleanupThread() {
return cleanupThread;
}
}

View File

@ -21,6 +21,8 @@ package org.elasticsearch.gradle.testclusters;
import groovy.lang.Closure;
import org.elasticsearch.gradle.DistributionDownloadPlugin;
import org.elasticsearch.gradle.ElasticsearchDistribution;
import org.elasticsearch.gradle.ReaperPlugin;
import org.elasticsearch.gradle.ReaperService;
import org.elasticsearch.gradle.test.RestTestRunnerTask;
import org.gradle.api.NamedDomainObjectContainer;
import org.gradle.api.Plugin;
@ -56,15 +58,18 @@ public class TestClustersPlugin implements Plugin<Project> {
private final Set<ElasticsearchCluster> runningClusters = new HashSet<>();
private final Boolean allowClusterToSurvive = Boolean.valueOf(System.getProperty(TESTCLUSTERS_INSPECT_FAILURE, "false"));
private ReaperService reaper;
@Override
public void apply(Project project) {
project.getPlugins().apply(DistributionDownloadPlugin.class);
project.getRootProject().getPluginManager().apply(ReaperPlugin.class);
reaper = project.getRootProject().getExtensions().getByType(ReaperService.class);
// enable the DSL to describe clusters
NamedDomainObjectContainer<ElasticsearchCluster> container = createTestClustersContainerExtension(project);
TestClustersCleanupExtension.createExtension(project);
// provide a task to be able to list defined clusters.
createListClustersTask(project, container);
@ -94,6 +99,7 @@ public class TestClustersPlugin implements Plugin<Project> {
project.getPath(),
name,
project,
reaper,
i -> distros.create(name + "-" + i),
new File(project.getBuildDir(), "testclusters")
)
@ -181,10 +187,6 @@ public class TestClustersPlugin implements Plugin<Project> {
.filter(cluster -> runningClusters.contains(cluster) == false)
.collect(Collectors.toList());
project.getRootProject().getExtensions()
.getByType(TestClustersCleanupExtension.class)
.getCleanupThread()
.watch(neededButNotRunning);
neededButNotRunning
.forEach(elasticsearchCluster -> {
elasticsearchCluster.start();
@ -233,11 +235,6 @@ public class TestClustersPlugin implements Plugin<Project> {
stopCluster(cluster, false);
runningClusters.remove(cluster);
});
project.getRootProject().getExtensions()
.getByType(TestClustersCleanupExtension.class)
.getCleanupThread()
.unWatch(stoppingClusers);
}
}
@Override

View File

@ -5,6 +5,7 @@ rootProject.name = dirName
List projects = [
'build-tools',
'build-tools:reaper',
'rest-api-spec',
'docs',
'client:rest',
@ -117,6 +118,7 @@ if (isEclipse) {
include projects.toArray(new String[0])
project(':build-tools').projectDir = new File(rootProject.projectDir, 'buildSrc')
project(':build-tools:reaper').projectDir = new File(rootProject.projectDir, 'buildSrc/reaper')
project(":libs").children.each { libsProject ->
libsProject.name = "elasticsearch-${libsProject.name}"