Restrict which tasks can use testclusters (#45198)

* Restrict which tasks can use testclusters

This PR fixes a problem between the interaction of test-clusters and
build cache.
Before this any task could have used a cluster without tracking it as
input.
With this change a new interface is introduced to track the tasks that
can use clusters and we do consider the cluster as input for all of
them.
This commit is contained in:
Alpar Torok 2019-08-09 13:32:27 +03:00
parent 5ddeb488a6
commit 634a070430
18 changed files with 179 additions and 181 deletions

View File

@ -20,6 +20,7 @@ package org.elasticsearch.gradle.test
import org.elasticsearch.gradle.VersionProperties
import org.elasticsearch.gradle.testclusters.ElasticsearchCluster
import org.elasticsearch.gradle.testclusters.RestTestRunnerTask
import org.elasticsearch.gradle.testclusters.TestClustersPlugin
import org.elasticsearch.gradle.tool.ClasspathUtils
import org.gradle.api.DefaultTask
@ -49,8 +50,6 @@ class RestIntegTestTask extends DefaultTask {
protected Test runner
protected Task clusterInit
/** Info about nodes in the integ test cluster. Note this is *not* available until runtime. */
List<NodeInfo> nodes
@ -61,8 +60,6 @@ class RestIntegTestTask extends DefaultTask {
RestIntegTestTask() {
runner = project.tasks.create("${name}Runner", RestTestRunnerTask.class)
super.dependsOn(runner)
clusterInit = project.tasks.create(name: "${name}Cluster#init", dependsOn: project.testClasses)
runner.dependsOn(clusterInit)
boolean usesTestclusters = project.plugins.hasPlugin(TestClustersPlugin.class)
if (usesTestclusters == false) {
clusterConfig = project.extensions.create("${name}Cluster", ClusterConfiguration.class, project)
@ -75,8 +72,6 @@ class RestIntegTestTask extends DefaultTask {
runner.useCluster project.testClusters."$name"
}
// override/add more for rest tests
runner.maxParallelForks = 1
runner.include('**/*IT.class')
runner.systemProperty('tests.rest.load_packaged', 'false')
@ -134,7 +129,6 @@ class RestIntegTestTask extends DefaultTask {
project.gradle.projectsEvaluated {
if (enabled == false) {
runner.enabled = false
clusterInit.enabled = false
return // no need to add cluster formation tasks if the task won't run!
}
if (usesTestclusters == false) {
@ -185,11 +179,6 @@ class RestIntegTestTask extends DefaultTask {
}
}
@Override
public Task mustRunAfter(Object... tasks) {
clusterInit.mustRunAfter(tasks)
}
public void runner(Closure configure) {
project.tasks.getByName("${name}Runner").configure(configure)
}

View File

@ -0,0 +1,17 @@
package org.elasticsearch.gradle.testclusters;
import org.gradle.api.DefaultTask;
import java.util.Collection;
import java.util.HashSet;
public class DefaultTestClustersTask extends DefaultTask implements TestClustersAware {
private Collection<ElasticsearchCluster> clusters = new HashSet<>();
@Override
public Collection<ElasticsearchCluster> getClusters() {
return clusters;
}
}

View File

@ -117,6 +117,10 @@ public class ElasticsearchCluster implements TestClusterConfiguration, Named {
return clusterName;
}
public String getPath() {
return path;
}
@Override
public void setVersion(String version) {
nodes.all(each -> each.setVersion(version));

View File

@ -216,6 +216,9 @@ public class ElasticsearchNode implements TestClusterConfiguration {
public void plugin(URI plugin) {
requireNonNull(plugin, "Plugin name can't be null");
checkFrozen();
if (plugins.contains(plugin)) {
throw new TestClustersException("Plugin already configured for installation " + plugin);
}
this.plugins.add(plugin);
}

View File

@ -1,12 +1,11 @@
package org.elasticsearch.gradle.test;
package org.elasticsearch.gradle.testclusters;
import org.elasticsearch.gradle.testclusters.ElasticsearchCluster;
import org.gradle.api.tasks.CacheableTask;
import org.gradle.api.tasks.Nested;
import org.gradle.api.tasks.testing.Test;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import static org.elasticsearch.gradle.testclusters.TestDistribution.INTEG_TEST;
@ -16,9 +15,9 @@ import static org.elasticsearch.gradle.testclusters.TestDistribution.INTEG_TEST;
* {@link Nested} inputs.
*/
@CacheableTask
public class RestTestRunnerTask extends Test {
public class RestTestRunnerTask extends Test implements TestClustersAware {
private Collection<ElasticsearchCluster> clusters = new ArrayList<>();
private Collection<ElasticsearchCluster> clusters = new HashSet<>();
public RestTestRunnerTask() {
super();
@ -26,12 +25,15 @@ public class RestTestRunnerTask extends Test {
task -> clusters.stream().flatMap(c -> c.getNodes().stream()).anyMatch(n -> n.getTestDistribution() != INTEG_TEST));
}
@Override
public int getMaxParallelForks() {
return 1;
}
@Nested
@Override
public Collection<ElasticsearchCluster> getClusters() {
return clusters;
}
public void testCluster(ElasticsearchCluster cluster) {
this.clusters.add(cluster);
}
}

View File

@ -0,0 +1,26 @@
package org.elasticsearch.gradle.testclusters;
import org.gradle.api.Task;
import org.gradle.api.tasks.Nested;
import java.util.Collection;
interface TestClustersAware extends Task {
@Nested
Collection<ElasticsearchCluster> getClusters();
default void useCluster(ElasticsearchCluster cluster) {
if (cluster.getPath().equals(getProject().getPath()) == false) {
throw new TestClustersException(
"Task " + getPath() + " can't use test cluster from" +
" another project " + cluster
);
}
for (ElasticsearchNode node : cluster.getNodes()) {
this.dependsOn(node.getDistribution().getExtracted());
}
getClusters().add(cluster);
}
}

View File

@ -18,32 +18,22 @@
*/
package org.elasticsearch.gradle.testclusters;
import groovy.lang.Closure;
import org.elasticsearch.gradle.DistributionDownloadPlugin;
import org.elasticsearch.gradle.ElasticsearchDistribution;
import org.elasticsearch.gradle.ReaperPlugin;
import org.elasticsearch.gradle.ReaperService;
import org.elasticsearch.gradle.test.RestTestRunnerTask;
import org.gradle.api.NamedDomainObjectContainer;
import org.gradle.api.Plugin;
import org.gradle.api.Project;
import org.gradle.api.Task;
import org.gradle.api.execution.TaskActionListener;
import org.gradle.api.execution.TaskExecutionListener;
import org.gradle.api.invocation.Gradle;
import org.gradle.api.logging.Logger;
import org.gradle.api.logging.Logging;
import org.gradle.api.plugins.ExtraPropertiesExtension;
import org.gradle.api.tasks.TaskState;
import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
public class TestClustersPlugin implements Plugin<Project> {
@ -51,12 +41,6 @@ public class TestClustersPlugin implements Plugin<Project> {
public static final String EXTENSION_NAME = "testClusters";
private static final Logger logger = Logging.getLogger(TestClustersPlugin.class);
private static final String TESTCLUSTERS_INSPECT_FAILURE = "testclusters.inspect.failure";
private final Map<Task, List<ElasticsearchCluster>> usedClusters = new HashMap<>();
private final Map<ElasticsearchCluster, Integer> claimsInventory = new HashMap<>();
private final Set<ElasticsearchCluster> runningClusters = new HashSet<>();
private final Boolean allowClusterToSurvive = Boolean.valueOf(System.getProperty(TESTCLUSTERS_INSPECT_FAILURE, "false"));
private ReaperService reaper;
@ -73,20 +57,22 @@ public class TestClustersPlugin implements Plugin<Project> {
// provide a task to be able to list defined clusters.
createListClustersTask(project, container);
// create DSL for tasks to mark clusters these use
createUseClusterTaskExtension(project, container);
if (project.getRootProject().getExtensions().findByType(TestClustersRegistry.class) == null) {
TestClustersRegistry registry = project.getRootProject().getExtensions()
.create("testClusters", TestClustersRegistry.class);
// When we know what tasks will run, we claim the clusters of those task to differentiate between clusters
// that are defined in the build script and the ones that will actually be used in this invocation of gradle
// we use this information to determine when the last task that required the cluster executed so that we can
// terminate the cluster right away and free up resources.
configureClaimClustersHook(project);
// When we know what tasks will run, we claim the clusters of those task to differentiate between clusters
// that are defined in the build script and the ones that will actually be used in this invocation of gradle
// we use this information to determine when the last task that required the cluster executed so that we can
// terminate the cluster right away and free up resources.
configureClaimClustersHook(project.getGradle(), registry);
// Before each task, we determine if a cluster needs to be started for that task.
configureStartClustersHook(project);
// Before each task, we determine if a cluster needs to be started for that task.
configureStartClustersHook(project.getGradle(), registry);
// After each task we determine if there are clusters that are no longer needed.
configureStopClustersHook(project);
// After each task we determine if there are clusters that are no longer needed.
configureStopClustersHook(project.getGradle(), registry);
}
}
private NamedDomainObjectContainer<ElasticsearchCluster> createTestClustersContainerExtension(Project project) {
@ -120,78 +106,28 @@ public class TestClustersPlugin implements Plugin<Project> {
);
}
private void createUseClusterTaskExtension(Project project, NamedDomainObjectContainer<ElasticsearchCluster> container) {
// register an extension for all current and future tasks, so that any task can declare that it wants to use a
// specific cluster.
project.getTasks().configureEach((Task task) ->
task.getExtensions().findByType(ExtraPropertiesExtension.class)
.set(
"useCluster",
new Closure<Void>(project, task) {
public void doCall(ElasticsearchCluster cluster) {
if (container.contains(cluster) == false) {
throw new TestClustersException(
"Task " + task.getPath() + " can't use test cluster from" +
" another project " + cluster
);
}
Object thisObject = this.getThisObject();
if (thisObject instanceof Task == false) {
throw new AssertionError("Expected " + thisObject + " to be an instance of " +
"Task, but got: " + thisObject.getClass());
}
usedClusters.computeIfAbsent(task, k -> new ArrayList<>()).add(cluster);
for (ElasticsearchNode node : cluster.getNodes()) {
((Task) thisObject).dependsOn(node.getDistribution().getExtracted());
}
if (thisObject instanceof RestTestRunnerTask) {
((RestTestRunnerTask) thisObject).testCluster(cluster);
}
}
})
);
}
private void configureClaimClustersHook(Project project) {
private static void configureClaimClustersHook(Gradle gradle, TestClustersRegistry registry) {
// Once we know all the tasks that need to execute, we claim all the clusters that belong to those and count the
// claims so we'll know when it's safe to stop them.
project.getGradle().getTaskGraph().whenReady(taskExecutionGraph -> {
Set<String> forExecution = taskExecutionGraph.getAllTasks().stream()
.map(Task::getPath)
.collect(Collectors.toSet());
usedClusters.forEach((task, listOfClusters) ->
listOfClusters.forEach(elasticsearchCluster -> {
if (forExecution.contains(task.getPath())) {
elasticsearchCluster.freeze();
claimsInventory.put(elasticsearchCluster, claimsInventory.getOrDefault(elasticsearchCluster, 0) + 1);
}
}));
if (claimsInventory.isEmpty() == false) {
logger.info("Claims inventory: {}", claimsInventory);
}
gradle.getTaskGraph().whenReady(taskExecutionGraph -> {
taskExecutionGraph.getAllTasks().stream()
.filter(task -> task instanceof TestClustersAware)
.map(task -> (TestClustersAware) task)
.flatMap(task -> task.getClusters().stream())
.forEach(registry::claimCluster);
});
}
private void configureStartClustersHook(Project project) {
project.getGradle().addListener(
private static void configureStartClustersHook(Gradle gradle, TestClustersRegistry registry) {
gradle.addListener(
new TaskActionListener() {
@Override
public void beforeActions(Task task) {
if (task instanceof TestClustersAware == false) {
return;
}
// we only start the cluster before the actions, so we'll not start it if the task is up-to-date
List<ElasticsearchCluster> neededButNotRunning = usedClusters.getOrDefault(
task,
Collections.emptyList()
)
.stream()
.filter(cluster -> runningClusters.contains(cluster) == false)
.collect(Collectors.toList());
neededButNotRunning
.forEach(elasticsearchCluster -> {
elasticsearchCluster.start();
runningClusters.add(elasticsearchCluster);
});
((TestClustersAware) task).getClusters().forEach(registry::maybeStartCluster);
}
@Override
public void afterActions(Task task) {}
@ -199,43 +135,18 @@ public class TestClustersPlugin implements Plugin<Project> {
);
}
private void configureStopClustersHook(Project project) {
project.getGradle().addListener(
private static void configureStopClustersHook(Gradle gradle, TestClustersRegistry registry) {
gradle.addListener(
new TaskExecutionListener() {
@Override
public void afterExecute(Task task, TaskState state) {
// always unclaim the cluster, even if _this_ task is up-to-date, as others might not have been
// and caused the cluster to start.
List<ElasticsearchCluster> clustersUsedByTask = usedClusters.getOrDefault(
task,
Collections.emptyList()
);
if (clustersUsedByTask.isEmpty()) {
if (task instanceof TestClustersAware == false) {
return;
}
logger.info("Clusters were used, stopping and releasing permits");
final int permitsToRelease;
if (state.getFailure() != null) {
// If the task fails, and other tasks use this cluster, the other task will likely never be
// executed at all, so we will never be called again to un-claim and terminate it.
clustersUsedByTask.forEach(cluster -> stopCluster(cluster, true));
permitsToRelease = clustersUsedByTask.stream()
.map(cluster -> cluster.getNumberOfNodes())
.reduce(Integer::sum).get();
} else {
clustersUsedByTask.forEach(
cluster -> claimsInventory.put(cluster, claimsInventory.getOrDefault(cluster, 0) - 1)
);
List<ElasticsearchCluster> stoppingClusers = claimsInventory.entrySet().stream()
.filter(entry -> entry.getValue() == 0)
.filter(entry -> runningClusters.contains(entry.getKey()))
.map(Map.Entry::getKey)
.collect(Collectors.toList());
stoppingClusers.forEach(cluster -> {
stopCluster(cluster, false);
runningClusters.remove(cluster);
});
}
// always unclaim the cluster, even if _this_ task is up-to-date, as others might not have been
// and caused the cluster to start.
((TestClustersAware) task).getClusters()
.forEach(cluster -> registry.stopCluster(cluster, state.getFailure() != null));
}
@Override
public void beforeExecute(Task task) {}
@ -243,25 +154,5 @@ public class TestClustersPlugin implements Plugin<Project> {
);
}
private void stopCluster(ElasticsearchCluster cluster, boolean taskFailed) {
if (allowClusterToSurvive) {
logger.info("Not stopping clusters, disabled by property");
if (taskFailed) {
// task failed or this is the last one to stop
for (int i=1 ; ; i += i) {
logger.lifecycle(
"No more test clusters left to run, going to sleep because {} was set," +
" interrupt (^C) to stop clusters.", TESTCLUSTERS_INSPECT_FAILURE
);
try {
Thread.sleep(1000 * i);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
}
}
cluster.stop(taskFailed);
}
}

View File

@ -0,0 +1,66 @@
package org.elasticsearch.gradle.testclusters;
import org.gradle.api.logging.Logger;
import org.gradle.api.logging.Logging;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
public class TestClustersRegistry {
private static final Logger logger = Logging.getLogger(TestClustersRegistry.class);
private static final String TESTCLUSTERS_INSPECT_FAILURE = "testclusters.inspect.failure";
private final Boolean allowClusterToSurvive = Boolean.valueOf(System.getProperty(TESTCLUSTERS_INSPECT_FAILURE, "false"));
private final Map<ElasticsearchCluster, Integer> claimsInventory = new HashMap<>();
private final Set<ElasticsearchCluster> runningClusters = new HashSet<>();
public void claimCluster(ElasticsearchCluster cluster) {
cluster.freeze();
claimsInventory.put(cluster, claimsInventory.getOrDefault(cluster, 0) + 1);
}
public void maybeStartCluster(ElasticsearchCluster cluster) {
if (runningClusters.contains(cluster)) {
return;
}
runningClusters.add(cluster);
cluster.start();
}
public void stopCluster(ElasticsearchCluster cluster, boolean taskFailed) {
if (taskFailed) {
// If the task fails, and other tasks use this cluster, the other task will likely never be
// executed at all, so we will never be called again to un-claim and terminate it.
if (allowClusterToSurvive) {
logger.info("Not stopping clusters, disabled by property");
// task failed or this is the last one to stop
for (int i = 1; ; i += i) {
logger.lifecycle(
"No more test clusters left to run, going to sleep because {} was set," +
" interrupt (^C) to stop clusters.", TESTCLUSTERS_INSPECT_FAILURE
);
try {
Thread.sleep(1000 * i);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
} else {
cluster.stop(false);
runningClusters.remove(cluster);
}
} else {
int currentClaims = claimsInventory.getOrDefault(cluster, 0) - 1;
claimsInventory.put(cluster, currentClaims);
if (currentClaims <= 0 && runningClusters.contains(cluster)) {
cluster.stop(false);
runningClusters.remove(cluster);
}
}
}
}

View File

@ -28,7 +28,7 @@ import java.util.function.Function;
public class LoggedExec extends Exec {
private Consumer<Logger> outputLogger;
public LoggedExec() {
if (getLogger().isInfoEnabled() == false) {

View File

@ -39,8 +39,8 @@ testClusters.'remote-cluster' {
}
task mixedClusterTest(type: RestIntegTestTask) {
useCluster testClusters.'remote-cluster'
runner {
useCluster testClusters.'remote-cluster'
dependsOn 'remote-cluster'
systemProperty 'tests.rest.suite', 'multi_cluster'
}

View File

@ -1,6 +1,7 @@
import org.elasticsearch.gradle.LoggedExec
import org.elasticsearch.gradle.VersionProperties
import org.apache.tools.ant.taskdefs.condition.Os
import org.elasticsearch.gradle.testclusters.DefaultTestClustersTask
import java.nio.charset.StandardCharsets
import java.nio.file.Files
@ -87,7 +88,7 @@ task deploy(type: Copy) {
into "${wildflyInstall}/standalone/deployments"
}
task writeElasticsearchProperties {
task writeElasticsearchProperties(type: DefaultTestClustersTask) {
onlyIf { !Os.isFamily(Os.FAMILY_WINDOWS) }
useCluster testClusters.integTest
dependsOn deploy

View File

@ -38,8 +38,8 @@ task writeJavaPolicy {
task "follow-cluster"(type: RestIntegTestTask) {
dependsOn 'writeJavaPolicy', "leader-cluster"
useCluster testClusters."leader-cluster"
runner {
useCluster testClusters."leader-cluster"
systemProperty 'java.security.policy', "file://${policyFile}"
systemProperty 'tests.target_cluster', 'follow'
nonInputProperties.systemProperty 'tests.leader_host', "${-> testClusters."leader-cluster".getAllHttpSocketURI().get(0)}"

View File

@ -24,8 +24,8 @@ testClusters."leader-cluster" {
task "middle-cluster"(type: RestIntegTestTask) {
dependsOn "leader-cluster"
useCluster testClusters."leader-cluster"
runner {
useCluster testClusters."leader-cluster"
systemProperty 'tests.target_cluster', 'middle'
nonInputProperties.systemProperty 'tests.leader_host',
"${-> testClusters."leader-cluster".getAllHttpSocketURI().get(0)}"
@ -40,9 +40,9 @@ testClusters."middle-cluster" {
task 'follow-cluster'(type: RestIntegTestTask) {
dependsOn "leader-cluster", "middle-cluster"
useCluster testClusters."leader-cluster"
useCluster testClusters."middle-cluster"
runner {
useCluster testClusters."leader-cluster"
useCluster testClusters."middle-cluster"
systemProperty 'tests.target_cluster', 'follow'
nonInputProperties.systemProperty 'tests.leader_host',
"${-> testClusters."leader-cluster".getAllHttpSocketURI().get(0)}"

View File

@ -21,8 +21,8 @@ testClusters.'leader-cluster' {
task 'follow-cluster'(type: RestIntegTestTask) {
dependsOn 'leader-cluster'
useCluster testClusters.'leader-cluster'
runner {
useCluster testClusters.'leader-cluster'
systemProperty 'tests.target_cluster', 'follow'
nonInputProperties.systemProperty 'tests.leader_host',
{ "${testClusters.'follow-cluster'.getAllHttpSocketURI().get(0)}" }

View File

@ -1,4 +1,5 @@
import org.elasticsearch.gradle.test.RestIntegTestTask
import org.elasticsearch.gradle.testclusters.RestTestRunnerTask
apply plugin: 'elasticsearch.testclusters'
apply plugin: 'elasticsearch.standalone-test'
@ -20,8 +21,8 @@ testClusters.'leader-cluster' {
task 'follow-cluster'(type: RestIntegTestTask) {
dependsOn 'leader-cluster'
useCluster testClusters.'leader-cluster'
runner {
useCluster testClusters.'leader-cluster'
systemProperty 'tests.target_cluster', 'follow'
nonInputProperties.systemProperty 'tests.leader_host',
"${-> testClusters.'leader-cluster'.getAllHttpSocketURI().get(0)}"
@ -36,12 +37,11 @@ testClusters.'follow-cluster' {
nameCustomization = { 'follow' }
}
task followClusterRestartTest(type: Test) {
task followClusterRestartTest(type: RestTestRunnerTask) {
dependsOn tasks.'follow-cluster'
useCluster testClusters.'leader-cluster'
useCluster testClusters.'follow-cluster'
maxParallelForks = 1
systemProperty 'tests.rest.load_packaged', 'false'
systemProperty 'tests.target_cluster', 'follow-restart'
doFirst {
@ -49,7 +49,6 @@ task followClusterRestartTest(type: Test) {
nonInputProperties.systemProperty 'tests.leader_host', "${-> testClusters.'leader-cluster'.getAllHttpSocketURI().get(0)}"
nonInputProperties.systemProperty 'tests.rest.cluster', "${-> testClusters.'follow-cluster'.getAllHttpSocketURI().join(",")}"
}
outputs.doNotCacheIf "Caching of REST tests not implemented yet", { false }
}
check.dependsOn followClusterRestartTest

View File

@ -28,8 +28,8 @@ testClusters.'leader-cluster' {
task 'follow-cluster'(type: RestIntegTestTask) {
dependsOn 'leader-cluster'
useCluster testClusters.'leader-cluster'
runner {
useCluster testClusters.'leader-cluster'
systemProperty 'tests.target_cluster', 'follow'
nonInputProperties.systemProperty 'tests.leader_host', "${-> testClusters.'leader-cluster'.getAllHttpSocketURI().get(0)}"
}

View File

@ -35,8 +35,8 @@ testClusters.'leader-cluster' {
task 'follow-cluster'(type: RestIntegTestTask) {
dependsOn 'leader-cluster'
useCluster testClusters.'leader-cluster'
runner {
useCluster testClusters.'leader-cluster'
systemProperty 'tests.target_cluster', 'follow'
nonInputProperties.systemProperty 'tests.leader_host',
"${-> testClusters."leader-cluster".getAllHttpSocketURI().get(0)}"

View File

@ -1,3 +1,5 @@
import org.elasticsearch.gradle.testclusters.RestTestRunnerTask
apply plugin: 'elasticsearch.testclusters'
apply plugin: 'elasticsearch.standalone-rest-test'
apply plugin: 'elasticsearch.rest-test'
@ -24,13 +26,11 @@ testClusters.integTest {
setting 'xpack.security.enabled', 'false'
}
task integTestSecurity(type: Test) {
task integTestSecurity(type: RestTestRunnerTask) {
description = "Run tests against a cluster that has security"
useCluster testClusters.integTest
dependsOn integTest
systemProperty 'tests.has_security', 'true'
maxParallelForks = 1
outputs.cacheIf "Caching of REST tests not implemented yet", { false }
doFirst {
testClusters.integTest {