Simplify testclusters, don't allow cross project clusters (#40972)

* Simplify testclusters, don't allow cross project clusters
This commit is contained in:
Alpar Torok 2019-04-12 12:38:29 +03:00
parent bee892006a
commit ff64314b2b

View File

@ -42,11 +42,11 @@ import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -56,19 +56,18 @@ public class TestClustersPlugin implements Plugin<Project> {
private static final String LIST_TASK_NAME = "listTestClusters"; private static final String LIST_TASK_NAME = "listTestClusters";
private static final String NODE_EXTENSION_NAME = "testClusters"; private static final String NODE_EXTENSION_NAME = "testClusters";
static final String HELPER_CONFIGURATION_NAME = "testclusters"; private static final String HELPER_CONFIGURATION_NAME = "testclusters";
private static final String SYNC_ARTIFACTS_TASK_NAME = "syncTestClustersArtifacts"; private static final String SYNC_ARTIFACTS_TASK_NAME = "syncTestClustersArtifacts";
private static final int EXECUTOR_SHUTDOWN_TIMEOUT = 1; private static final int EXECUTOR_SHUTDOWN_TIMEOUT = 1;
private static final TimeUnit EXECUTOR_SHUTDOWN_TIMEOUT_UNIT = TimeUnit.MINUTES; private static final TimeUnit EXECUTOR_SHUTDOWN_TIMEOUT_UNIT = TimeUnit.MINUTES;
private static final Logger logger = Logging.getLogger(TestClustersPlugin.class); private static final Logger logger = Logging.getLogger(TestClustersPlugin.class);
// this is static because we need a single mapping across multi project builds, as some of the listeners we use, private final Map<Task, List<ElasticsearchCluster>> usedClusters = new HashMap<>();
// like task graph are singletons across multi project builds. private final Map<ElasticsearchCluster, Integer> claimsInventory = new HashMap<>();
private static final Map<Task, List<ElasticsearchCluster>> usedClusters = new ConcurrentHashMap<>(); private final Set<ElasticsearchCluster> runningClusters =new HashSet<>();
private static final Map<ElasticsearchCluster, Integer> claimsInventory = new ConcurrentHashMap<>(); private final Thread shutdownHook = new Thread(this::shutDownAllClusters);
private static final Set<ElasticsearchCluster> runningClusters = Collections.synchronizedSet(new HashSet<>()); private ExecutorService executorService = Executors.newSingleThreadExecutor();
private static volatile ExecutorService executorService;
@Override @Override
public void apply(Project project) { public void apply(Project project) {
@ -81,10 +80,8 @@ public class TestClustersPlugin implements Plugin<Project> {
createListClustersTask(project, container); createListClustersTask(project, container);
// create DSL for tasks to mark clusters these use // create DSL for tasks to mark clusters these use
createUseClusterTaskExtension(project); createUseClusterTaskExtension(project, container);
// There's a single Gradle instance for multi project builds, this means that some configuration needs to be
// done only once even if the plugin is applied multiple times as a part of multi project build
if (rootProject.getConfigurations().findByName(HELPER_CONFIGURATION_NAME) == null) { if (rootProject.getConfigurations().findByName(HELPER_CONFIGURATION_NAME) == null) {
// We use a single configuration on the root project to resolve all testcluster dependencies ( like distros ) // We use a single configuration on the root project to resolve all testcluster dependencies ( like distros )
// at once, only once without the need to repeat it for each project. This pays off assuming that most // at once, only once without the need to repeat it for each project. This pays off assuming that most
@ -95,18 +92,14 @@ public class TestClustersPlugin implements Plugin<Project> {
"ES distributions and plugins." "ES distributions and plugins."
); );
// When running in the Daemon it's possible for this to hold references to past
usedClusters.clear();
claimsInventory.clear();
runningClusters.clear();
// We have a single task to sync the helper configuration to "artifacts dir" // We have a single task to sync the helper configuration to "artifacts dir"
// the clusters will look for artifacts there based on the naming conventions. // the clusters will look for artifacts there based on the naming conventions.
// Tasks that use a cluster will add this as a dependency automatically so it's guaranteed to run early in // Tasks that use a cluster will add this as a dependency automatically so it's guaranteed to run early in
// the build. // the build.
rootProject.getTasks().create(SYNC_ARTIFACTS_TASK_NAME, sync -> { rootProject.getTasks().create(SYNC_ARTIFACTS_TASK_NAME, sync -> {
sync.getInputs().files((Callable<FileCollection>) helperConfiguration::getAsFileTree); sync.getInputs().files((Callable<FileCollection>) helperConfiguration::getAsFileTree);
sync.getOutputs().dir(getTestClustersConfigurationExtractDir(project)); sync.getOutputs().dir(new File(project.getRootProject().getBuildDir(), "testclusters/extract"));
// NOTE: Gradle doesn't allow a lambda here ( fails at runtime )
sync.doLast(new Action<Task>() { sync.doLast(new Action<Task>() {
@Override @Override
public void execute(Task task) { public void execute(Task task) {
@ -121,33 +114,33 @@ public class TestClustersPlugin implements Plugin<Project> {
} else { } else {
throw new IllegalArgumentException("Can't extract " + file + " unknown file extension"); throw new IllegalArgumentException("Can't extract " + file + " unknown file extension");
} }
spec.from(files).into(getTestClustersConfigurationExtractDir(project) + "/" + spec.from(files).into(new File(project.getRootProject().getBuildDir(), "testclusters/extract") + "/" +
resolvedArtifact.getModuleVersion().getId().getGroup() resolvedArtifact.getModuleVersion().getId().getGroup()
); );
})); }));
} }
}); });
}); });
// When we know what tasks will run, we claim the clusters of those task to differentiate between clusters
// that are defined in the build script and the ones that will actually be used in this invocation of gradle
// we use this information to determine when the last task that required the cluster executed so that we can
// terminate the cluster right away and free up resources.
configureClaimClustersHook(project);
// Before each task, we determine if a cluster needs to be started for that task.
configureStartClustersHook(project);
// After each task we determine if there are clusters that are no longer needed.
configureStopClustersHook(project);
// configure hooks to make sure no test cluster processes survive the build
configureCleanupHooks(project);
// Since we have everything modeled in the DSL, add all the required dependencies e.x. the distribution to the
// configuration so the user doesn't have to repeat this.
autoConfigureClusterDependencies(project, rootProject, container);
} }
// When we know what tasks will run, we claim the clusters of those task to differentiate between clusters
// that are defined in the build script and the ones that will actually be used in this invocation of gradle
// we use this information to determine when the last task that required the cluster executed so that we can
// terminate the cluster right away and free up resources.
configureClaimClustersHook(project);
// Before each task, we determine if a cluster needs to be started for that task.
configureStartClustersHook(project);
// After each task we determine if there are clusters that are no longer needed.
configureStopClustersHook(project);
// configure hooks to make sure no test cluster processes survive the build
configureCleanupHooks(project);
// Since we have everything modeled in the DSL, add all the required dependencies e.x. the distribution to the
// configuration so the user doesn't have to repeat this.
autoConfigureClusterDependencies(project, rootProject, container);
} }
private NamedDomainObjectContainer<ElasticsearchCluster> createTestClustersContainerExtension(Project project) { private NamedDomainObjectContainer<ElasticsearchCluster> createTestClustersContainerExtension(Project project) {
@ -158,7 +151,7 @@ public class TestClustersPlugin implements Plugin<Project> {
project.getPath(), project.getPath(),
name, name,
project, project,
getTestClustersConfigurationExtractDir(project), new File(project.getRootProject().getBuildDir(), "testclusters/extract"),
new File(project.getBuildDir(), "testclusters") new File(project.getBuildDir(), "testclusters")
) )
); );
@ -178,7 +171,7 @@ public class TestClustersPlugin implements Plugin<Project> {
); );
} }
private static void createUseClusterTaskExtension(Project project) { private void createUseClusterTaskExtension(Project project, NamedDomainObjectContainer<ElasticsearchCluster> container) {
// register an extension for all current and future tasks, so that any task can declare that it wants to use a // register an extension for all current and future tasks, so that any task can declare that it wants to use a
// specific cluster. // specific cluster.
project.getTasks().all((Task task) -> project.getTasks().all((Task task) ->
@ -187,6 +180,12 @@ public class TestClustersPlugin implements Plugin<Project> {
"useCluster", "useCluster",
new Closure<Void>(project, task) { new Closure<Void>(project, task) {
public void doCall(ElasticsearchCluster cluster) { public void doCall(ElasticsearchCluster cluster) {
if (container.contains(cluster) == false) {
throw new TestClustersException(
"Task " + task.getPath() + " can't use test cluster from" +
" another project " + cluster
);
}
Object thisObject = this.getThisObject(); Object thisObject = this.getThisObject();
if (thisObject instanceof Task == false) { if (thisObject instanceof Task == false) {
throw new AssertionError("Expected " + thisObject + " to be an instance of " + throw new AssertionError("Expected " + thisObject + " to be an instance of " +
@ -201,35 +200,38 @@ public class TestClustersPlugin implements Plugin<Project> {
); );
} }
private static void configureClaimClustersHook(Project project) { private void configureClaimClustersHook(Project project) {
project.getGradle().getTaskGraph().whenReady(taskExecutionGraph -> // Once we know all the tasks that need to execute, we claim all the clusters that belong to those and count the
taskExecutionGraph.getAllTasks() // claims so we'll know when it's safe to stop them.
.forEach(task -> project.getGradle().getTaskGraph().whenReady(taskExecutionGraph -> {
usedClusters.getOrDefault(task, Collections.emptyList()).forEach(each -> { Set<String> forExecution = taskExecutionGraph.getAllTasks().stream()
synchronized (claimsInventory) { .map(Task::getPath)
claimsInventory.put(each, claimsInventory.getOrDefault(each, 0) + 1); .collect(Collectors.toSet());
}
each.freeze(); usedClusters.forEach((task, listOfClusters) ->
}) listOfClusters.forEach(elasticsearchCluster -> {
) if (forExecution.contains(task.getPath())) {
); elasticsearchCluster.freeze();
claimsInventory.put(elasticsearchCluster, claimsInventory.getOrDefault(elasticsearchCluster, 0) + 1);
}
}));
logger.info("Claims inventory: {}", claimsInventory);
});
} }
private static void configureStartClustersHook(Project project) { private void configureStartClustersHook(Project project) {
project.getGradle().addListener( project.getGradle().addListener(
new TaskActionListener() { new TaskActionListener() {
@Override @Override
public void beforeActions(Task task) { public void beforeActions(Task task) {
// we only start the cluster before the actions, so we'll not start it if the task is up-to-date // we only start the cluster before the actions, so we'll not start it if the task is up-to-date
final List<ElasticsearchCluster> clustersToStart; usedClusters.getOrDefault(task, Collections.emptyList()).stream()
synchronized (runningClusters) { .filter(each -> runningClusters.contains(each) == false)
clustersToStart = usedClusters.getOrDefault(task,Collections.emptyList()).stream() .forEach(elasticsearchCluster -> {
.filter(each -> runningClusters.contains(each) == false) elasticsearchCluster.start();
.collect(Collectors.toList()); runningClusters.add(elasticsearchCluster);
runningClusters.addAll(clustersToStart); });
}
clustersToStart.forEach(ElasticsearchCluster::start);
} }
@Override @Override
public void afterActions(Task task) {} public void afterActions(Task task) {}
@ -237,7 +239,7 @@ public class TestClustersPlugin implements Plugin<Project> {
); );
} }
private static void configureStopClustersHook(Project project) { private void configureStopClustersHook(Project project) {
project.getGradle().addListener( project.getGradle().addListener(
new TaskExecutionListener() { new TaskExecutionListener() {
@Override @Override
@ -251,25 +253,19 @@ public class TestClustersPlugin implements Plugin<Project> {
if (state.getFailure() != null) { if (state.getFailure() != null) {
// If the task fails, and other tasks use this cluster, the other task will likely never be // If the task fails, and other tasks use this cluster, the other task will likely never be
// executed at all, so we will never get to un-claim and terminate it. // executed at all, so we will never get to un-claim and terminate it.
// The downside is that with multi project builds if that other task is in a different
// project and executing right now, we may terminate the cluster while it's running it.
clustersUsedByTask.forEach(each -> each.stop(true)); clustersUsedByTask.forEach(each -> each.stop(true));
} else { } else {
clustersUsedByTask.forEach(each -> { clustersUsedByTask.forEach(
synchronized (claimsInventory) { each -> claimsInventory.put(each, claimsInventory.getOrDefault(each, 0) - 1)
claimsInventory.put(each, claimsInventory.get(each) - 1); );
} claimsInventory.entrySet().stream()
}); .filter(entry -> entry.getValue() == 0)
final List<ElasticsearchCluster> stoppable; .filter(entry -> runningClusters.contains(entry.getKey()))
synchronized (runningClusters) { .map(Map.Entry::getKey)
stoppable = claimsInventory.entrySet().stream() .forEach(each -> {
.filter(entry -> entry.getValue() == 0) each.stop(false);
.filter(entry -> runningClusters.contains(entry.getKey())) runningClusters.remove(each);
.map(Map.Entry::getKey) });
.collect(Collectors.toList());
runningClusters.removeAll(stoppable);
}
stoppable.forEach(each -> each.stop(false));
} }
} }
@Override @Override
@ -278,10 +274,6 @@ public class TestClustersPlugin implements Plugin<Project> {
); );
} }
static File getTestClustersConfigurationExtractDir(Project project) {
return new File(project.getRootProject().getBuildDir(), "testclusters/extract");
}
/** /**
* Boilerplate to get testClusters container extension * Boilerplate to get testClusters container extension
* *
@ -354,15 +346,9 @@ public class TestClustersPlugin implements Plugin<Project> {
}))); })));
} }
private static void configureCleanupHooks(Project project) { private void configureCleanupHooks(Project project) {
synchronized (runningClusters) {
if (executorService == null || executorService.isTerminated()) {
executorService = Executors.newSingleThreadExecutor();
} else {
throw new IllegalStateException("Trying to configure executor service twice");
}
}
// When the Gradle daemon is used, it will interrupt all threads when the build concludes. // When the Gradle daemon is used, it will interrupt all threads when the build concludes.
// This is our signal to clean up
executorService.submit(() -> { executorService.submit(() -> {
while (true) { while (true) {
try { try {
@ -375,17 +361,21 @@ public class TestClustersPlugin implements Plugin<Project> {
} }
}); });
project.getGradle().buildFinished(buildResult -> {
logger.info("Build finished");
shutdownExecutorService();
});
// When the Daemon is not used, or runs into issues, rely on a shutdown hook // When the Daemon is not used, or runs into issues, rely on a shutdown hook
// When the daemon is used, but does not work correctly and eventually dies off (e.x. due to non interruptible // When the daemon is used, but does not work correctly and eventually dies off (e.x. due to non interruptible
// thread in the build) process will be stopped eventually when the daemon dies. // thread in the build) process will be stopped eventually when the daemon dies.
Runtime.getRuntime().addShutdownHook(new Thread(TestClustersPlugin::shutDownAllClusters)); Runtime.getRuntime().addShutdownHook(shutdownHook);
// When we don't run into anything out of the ordinary, and the build completes, makes sure to clean up
project.getGradle().buildFinished(buildResult -> {
shutdownExecutorService();
if (false == Runtime.getRuntime().removeShutdownHook(shutdownHook)) {
logger.info("Trying to deregister shutdown hook when it was not registered.");
}
});
} }
private static void shutdownExecutorService() { private void shutdownExecutorService() {
executorService.shutdownNow(); executorService.shutdownNow();
try { try {
if (executorService.awaitTermination(EXECUTOR_SHUTDOWN_TIMEOUT, EXECUTOR_SHUTDOWN_TIMEOUT_UNIT) == false) { if (executorService.awaitTermination(EXECUTOR_SHUTDOWN_TIMEOUT, EXECUTOR_SHUTDOWN_TIMEOUT_UNIT) == false) {
@ -400,13 +390,13 @@ public class TestClustersPlugin implements Plugin<Project> {
} }
} }
private static void shutDownAllClusters() { private void shutDownAllClusters() {
if (logger.isDebugEnabled()) {
logger.debug("Shutting down all test clusters", new RuntimeException());
}
synchronized (runningClusters) { synchronized (runningClusters) {
runningClusters.forEach(each -> each.stop(true)); Iterator<ElasticsearchCluster> iterator = runningClusters.iterator();
runningClusters.clear(); while (iterator.hasNext()) {
iterator.remove();
iterator.next().stop(true);
}
} }
} }