Testclusters: convert plugin repository-s3 (#40399)

* Add support for setting and keystore settings
* system properties and env var config
* use testclusters for repository-s3
* Some cleanup of the build.gradle file for plugin-s3
* add runner {} to rest integ test task
This commit is contained in:
Alpar Torok 2019-03-27 08:37:04 +02:00
parent aa24669e95
commit 524e0273ae
6 changed files with 248 additions and 176 deletions

View File

@ -44,6 +44,7 @@ public class PluginBuildPlugin extends BuildPlugin {
public void apply(Project project) {
super.apply(project)
configureDependencies(project)
// this afterEvaluate must happen before the afterEvaluate added by integTest creation,
// so that the file name resolution for installing the plugin will be setup
project.afterEvaluate {
@ -69,7 +70,7 @@ public class PluginBuildPlugin extends BuildPlugin {
if (isModule) {
throw new RuntimeException("Testclusters does not support modules yet");
} else {
project.testClusters.integTestCluster.plugin(
project.testClusters.integTest.plugin(
project.file(project.tasks.bundlePlugin.archiveFile)
)
}

View File

@ -62,13 +62,13 @@ public class RestIntegTestTask extends DefaultTask {
clusterConfig = project.extensions.create("${name}Cluster", ClusterConfiguration.class, project)
} else {
project.testClusters {
integTestCluster {
"$name" {
distribution = 'INTEG_TEST'
version = project.version
javaHome = project.file(project.ext.runtimeJavaHome)
}
}
runner.useCluster project.testClusters.integTestCluster
runner.useCluster project.testClusters."$name"
}
// override/add more for rest tests
@ -81,7 +81,7 @@ public class RestIntegTestTask extends DefaultTask {
throw new IllegalArgumentException("tests.rest.cluster and tests.cluster must both be null or non-null")
}
if (usesTestclusters == true) {
ElasticsearchNode node = project.testClusters.integTestCluster
ElasticsearchNode node = project.testClusters."${name}"
runner.systemProperty('tests.rest.cluster', {node.allHttpSocketURI.join(",") })
runner.systemProperty('tests.config.dir', {node.getConfigDir()})
runner.systemProperty('tests.cluster', {node.transportPortURI})
@ -187,6 +187,10 @@ public class RestIntegTestTask extends DefaultTask {
clusterInit.mustRunAfter(tasks)
}
public void runner(Closure configure) {
project.tasks.getByName("${name}Runner").configure(configure)
}
/** Print out an excerpt of the log from the given node. */
protected static void printLogExcerpt(NodeInfo nodeInfo) {
File logFile = new File(nodeInfo.homeDir, "logs/${nodeInfo.clusterName}.log")

View File

@ -26,8 +26,10 @@ import org.gradle.api.logging.Logger;
import org.gradle.api.logging.Logging;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.UncheckedIOException;
import java.net.HttpURLConnection;
@ -39,15 +41,18 @@ import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -71,6 +76,10 @@ public class ElasticsearchNode {
private final LinkedHashMap<String, Predicate<ElasticsearchNode>> waitConditions;
private final List<URI> plugins = new ArrayList<>();
private final Map<String, Supplier<CharSequence>> settings = new LinkedHashMap<>();
private final Map<String, Supplier<CharSequence>> keystoreSettings = new LinkedHashMap<>();
private final Map<String, Supplier<CharSequence>> systemProperties = new LinkedHashMap<>();
private final Map<String, Supplier<CharSequence>> environment = new LinkedHashMap<>();
private final Path confPathRepo;
private final Path configFile;
@ -143,6 +152,55 @@ public class ElasticsearchNode {
plugin(plugin.toURI());
}
public void keystore(String key, String value) {
addSupplier("Keystore", keystoreSettings, key, value);
}
public void keystore(String key, Supplier<CharSequence> valueSupplier) {
addSupplier("Keystore", keystoreSettings, key, valueSupplier);
}
public void setting(String key, String value) {
addSupplier("Settings", settings, key, value);
}
public void setting(String key, Supplier<CharSequence> valueSupplier) {
addSupplier("Setting", settings, key, valueSupplier);
}
public void systemProperty(String key, String value) {
addSupplier("Java System property", systemProperties, key, value);
}
public void systemProperty(String key, Supplier<CharSequence> valueSupplier) {
addSupplier("Java System property", systemProperties, key, valueSupplier);
}
public void environment(String key, String value) {
addSupplier("Environment variable", environment, key, value);
}
public void environment(String key, Supplier<CharSequence> valueSupplier) {
addSupplier("Environment variable", environment, key, valueSupplier);
}
private void addSupplier(String name, Map<String, Supplier<CharSequence>> collector, String key, Supplier<CharSequence> valueSupplier) {
requireNonNull(key, name + " key was null when configuring test cluster `" + this + "`");
requireNonNull(valueSupplier, name + " value supplier was null when configuring test cluster `" + this + "`");
collector.put(key, valueSupplier);
}
private void addSupplier(String name, Map<String, Supplier<CharSequence>> collector, String key, String actualValue) {
requireNonNull(actualValue, name + " value was null when configuring test cluster `" + this + "`");
addSupplier(name, collector, key, () -> actualValue);
}
private void checkSuppliers(String name, Map<String, Supplier<CharSequence>> collector) {
collector.forEach((key, value) -> {
requireNonNull(value.get().toString(), name + " supplied value was null when configuring test cluster `" + this + "`");
});
}
public Path getConfigDir() {
return configFile.getParent();
}
@ -168,6 +226,8 @@ public class ElasticsearchNode {
return javaHome;
}
private void waitForUri(String description, String uri) {
waitConditions.put(description, (node) -> {
try {
@ -222,10 +282,19 @@ public class ElasticsearchNode {
"install", "--batch", plugin.toString())
);
if (keystoreSettings.isEmpty() == false) {
checkSuppliers("Keystore", keystoreSettings);
runElaticsearchBinScript("elasticsearch-keystore", "create");
keystoreSettings.forEach((key, value) -> {
runElaticsearchBinScriptWithInput(value.get().toString(), "elasticsearch-keystore", "add", "-x", key);
});
}
startElasticsearchProcess();
}
private void runElaticsearchBinScript(String tool, String... args) {
private void runElaticsearchBinScriptWithInput(String input, String tool, String... args) {
try (InputStream byteArrayInputStream = new ByteArrayInputStream(input.getBytes(StandardCharsets.UTF_8))) {
services.loggedExec(spec -> {
spec.setEnvironment(getESEnvironment());
spec.workingDir(workingDir);
@ -249,19 +318,43 @@ public class ElasticsearchNode {
.onUnix(() -> Arrays.asList(args))
.supply()
);
spec.setStandardInput(byteArrayInputStream);
});
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
private void runElaticsearchBinScript(String tool, String... args) {
runElaticsearchBinScriptWithInput("", tool, args);
}
private Map<String, String> getESEnvironment() {
Map<String, String> environment= new HashMap<>();
environment.put("JAVA_HOME", getJavaHome().getAbsolutePath());
environment.put("ES_PATH_CONF", configFile.getParent().toString());
environment.put("ES_JAVA_OPTS", "-Xms512m -Xmx512m");
environment.put("ES_TMPDIR", tmpDir.toString());
Map<String, String> defaultEnv = new HashMap<>();
defaultEnv.put("JAVA_HOME", getJavaHome().getAbsolutePath());
defaultEnv.put("ES_PATH_CONF", configFile.getParent().toString());
String systemPropertiesString = "";
if (systemProperties.isEmpty() == false) {
checkSuppliers("Java System property", systemProperties);
systemPropertiesString = " " + systemProperties.entrySet().stream()
.map(entry -> "-D" + entry.getKey() + "=" + entry.getValue().get())
.collect(Collectors.joining(" "));
}
defaultEnv.put("ES_JAVA_OPTS", "-Xms512m -Xmx512m -ea -esa" + systemPropertiesString);
defaultEnv.put("ES_TMPDIR", tmpDir.toString());
// Windows requires this as it defaults to `c:\windows` despite ES_TMPDIR
defaultEnv.put("TMP", tmpDir.toString());
environment.put("TMP", tmpDir.toString());
return environment;
Set<String> commonKeys = new HashSet<>(environment.keySet());
commonKeys.retainAll(defaultEnv.keySet());
if (commonKeys.isEmpty() == false) {
throw new IllegalStateException("testcluster does not allow setting the following env vars " + commonKeys);
}
checkSuppliers("Environment variable", environment);
environment.forEach((key, value) -> defaultEnv.put(key, value.get().toString()));
return defaultEnv;
}
private void startElasticsearchProcess() {
@ -445,37 +538,49 @@ public class ElasticsearchNode {
}
private void createConfiguration() {
LinkedHashMap<String, String> config = new LinkedHashMap<>();
LinkedHashMap<String, String> defaultConfig = new LinkedHashMap<>();
String nodeName = safeName(name);
config.put("cluster.name",nodeName);
config.put("node.name", nodeName);
config.put("path.repo", confPathRepo.toAbsolutePath().toString());
config.put("path.data", confPathData.toAbsolutePath().toString());
config.put("path.logs", confPathLogs.toAbsolutePath().toString());
config.put("path.shared_data", workingDir.resolve("sharedData").toString());
config.put("node.attr.testattr", "test");
config.put("node.portsfile", "true");
config.put("http.port", "0");
config.put("transport.tcp.port", "0");
defaultConfig.put("cluster.name",nodeName);
defaultConfig.put("node.name", nodeName);
defaultConfig.put("path.repo", confPathRepo.toAbsolutePath().toString());
defaultConfig.put("path.data", confPathData.toAbsolutePath().toString());
defaultConfig.put("path.logs", confPathLogs.toAbsolutePath().toString());
defaultConfig.put("path.shared_data", workingDir.resolve("sharedData").toString());
defaultConfig.put("node.attr.testattr", "test");
defaultConfig.put("node.portsfile", "true");
defaultConfig.put("http.port", "0");
defaultConfig.put("transport.tcp.port", "0");
// Default the watermarks to absurdly low to prevent the tests from failing on nodes without enough disk space
config.put("cluster.routing.allocation.disk.watermark.low", "1b");
config.put("cluster.routing.allocation.disk.watermark.high", "1b");
defaultConfig.put("cluster.routing.allocation.disk.watermark.low", "1b");
defaultConfig.put("cluster.routing.allocation.disk.watermark.high", "1b");
// increase script compilation limit since tests can rapid-fire script compilations
config.put("script.max_compilations_rate", "2048/1m");
defaultConfig.put("script.max_compilations_rate", "2048/1m");
if (Version.fromString(version).getMajor() >= 6) {
config.put("cluster.routing.allocation.disk.watermark.flood_stage", "1b");
defaultConfig.put("cluster.routing.allocation.disk.watermark.flood_stage", "1b");
}
if (Version.fromString(version).getMajor() >= 7) {
config.put("cluster.initial_master_nodes", "[" + nodeName + "]");
defaultConfig.put("cluster.initial_master_nodes", "[" + nodeName + "]");
}
checkSuppliers("Settings", settings);
Map<String, String> userConfig = settings.entrySet().stream()
.collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue().get().toString()));
HashSet<String> overriden = new HashSet<>(defaultConfig.keySet());
overriden.retainAll(userConfig.keySet());
if (overriden.isEmpty() ==false) {
throw new IllegalArgumentException("Testclusters does not allow the following settings to be changed:" + overriden);
}
try {
// We create hard links for the distribution, so we need to remove the config file before writing it
// to prevent the changes to reflect across all copies.
Files.delete(configFile);
Files.write(
configFile,
config.entrySet().stream()
Stream.concat(
userConfig.entrySet().stream(),
defaultConfig.entrySet().stream()
)
.map(entry -> entry.getKey() + ": " + entry.getValue())
.collect(Collectors.joining("\n"))
.getBytes(StandardCharsets.UTF_8)

View File

@ -21,7 +21,7 @@
configure(subprojects.findAll { it.parent.path == project.path }) {
group = 'org.elasticsearch.plugin'
// TODO exclude some plugins as they require features not yet supproted by testclusters
if (false == name in ['repository-azure', 'repository-hdfs', 'repository-s3']) {
if (false == name in ['repository-hdfs']) {
apply plugin: 'elasticsearch.testclusters'
}

View File

@ -65,7 +65,9 @@ check {
dependsOn 'qa:microsoft-azure-storage:check'
}
integTestCluster {
keystoreSetting 'azure.client.integration_test.account', 'azure_account'
keystoreSetting 'azure.client.integration_test.key', 'azure_key'
testClusters {
integTest {
keystore 'azure.client.integration_test.account', 'azure_account'
keystore 'azure.client.integration_test.key', 'azure_key'
}
}

View File

@ -1,7 +1,6 @@
import org.elasticsearch.gradle.BuildPlugin
import org.elasticsearch.gradle.MavenFilteringHack
import org.elasticsearch.gradle.test.AntFixture
import org.elasticsearch.gradle.test.ClusterConfiguration
import org.elasticsearch.gradle.test.RestIntegTestTask
import com.carrotsearch.gradle.junit4.RandomizedTestingTask
@ -71,7 +70,7 @@ task testRepositoryCreds(type: RandomizedTestingTask) {
include '**/S3BlobStoreRepositoryTests.class'
systemProperty 'es.allow_insecure_settings', 'true'
}
project.check.dependsOn(testRepositoryCreds)
check.dependsOn(testRepositoryCreds)
unitTest {
// these are tested explicitly in separate test tasks
@ -136,26 +135,9 @@ if (!s3EC2Bucket && !s3EC2BasePath && !s3ECSBucket && !s3ECSBasePath) {
throw new IllegalArgumentException("not all options specified to run EC2/ECS tests are present")
}
buildscript {
repositories {
maven {
url 'https://plugins.gradle.org/m2/'
}
}
dependencies {
classpath 'de.undercouch:gradle-download-task:3.4.3'
}
}
if (useFixture) {
apply plugin: 'elasticsearch.test.fixtures'
RestIntegTestTask integTestMinio = project.tasks.create('integTestMinio', RestIntegTestTask.class) {
description = "Runs REST tests using the Minio repository."
}
Task writeDockerFile = project.tasks.create('writeDockerFile') {
task writeDockerFile {
File minioDockerfile = new File("${project.buildDir}/minio-docker/Dockerfile")
outputs.file(minioDockerfile)
doLast {
@ -166,48 +148,44 @@ if (useFixture) {
"ENV MINIO_SECRET_KEY ${s3PermanentSecretKey}"
}
}
preProcessFixture.dependsOn(writeDockerFile)
// The following closure must execute before the afterEvaluate block in the constructor of the following integrationTest tasks:
project.afterEvaluate {
// Only configure the Minio tests if postProcessFixture is configured to skip them if Docker is not available
// or fixtures have been disabled
if (postProcessFixture.enabled) {
ClusterConfiguration cluster = project.extensions.getByName('integTestMinioCluster') as ClusterConfiguration
cluster.dependsOn(project.bundlePlugin)
cluster.dependsOn(postProcessFixture)
cluster.keystoreSetting 's3.client.integration_test_permanent.access_key', s3PermanentAccessKey
cluster.keystoreSetting 's3.client.integration_test_permanent.secret_key', s3PermanentSecretKey
Closure<String> minioAddressAndPort = {
int minioPort = postProcessFixture.ext."test.fixtures.minio-fixture.tcp.9000"
assert minioPort > 0
return 'http://127.0.0.1:' + minioPort
}
cluster.setting 's3.client.integration_test_permanent.endpoint', "${-> minioAddressAndPort.call()}"
Task restIntegTestTask = project.tasks.getByName('integTestMinio')
restIntegTestTask.clusterConfig.plugin(project.path)
// Default jvm arguments for all test clusters
String jvmArgs = "-Xms" + System.getProperty('tests.heap.size', '512m') +
" " + "-Xmx" + System.getProperty('tests.heap.size', '512m') +
" " + System.getProperty('tests.jvm.argline', '')
restIntegTestTask.clusterConfig.jvmArgs = jvmArgs
project.check.dependsOn(integTestMinio)
}
preProcessFixture {
dependsOn(writeDockerFile)
}
integTestMinioRunner.dependsOn(postProcessFixture)
task integTestMinio(type: RestIntegTestTask) {
description = "Runs REST tests using the Minio repository."
dependsOn tasks.bundlePlugin, tasks.postProcessFixture
runner {
// Minio only supports a single access key, see https://github.com/minio/minio/pull/5968
integTestMinioRunner.systemProperty 'tests.rest.blacklist', [
systemProperty 'tests.rest.blacklist', [
'repository_s3/30_repository_temporary_credentials/*',
'repository_s3/40_repository_ec2_credentials/*',
'repository_s3/50_repository_ecs_credentials/*'
].join(",")
}
}
check.dependsOn(integTestMinio)
BuildPlugin.requireDocker(tasks.integTestMinio)
BuildPlugin.requireDocker(integTestMinio)
testClusters.integTestMinio {
keystore 's3.client.integration_test_permanent.access_key', s3PermanentAccessKey
keystore 's3.client.integration_test_permanent.secret_key', s3PermanentSecretKey
setting 's3.client.integration_test_permanent.endpoint', {
int minioPort = postProcessFixture.ext."test.fixtures.minio-fixture.tcp.9000"
assert minioPort > 0
return 'http://127.0.0.1:' + minioPort
}
plugin file(tasks.bundlePlugin.archiveFile)
}
} else {
integTest.runner {
systemProperty 'tests.rest.blacklist',
[
'repository_s3/30_repository_temporary_credentials/*',
'repository_s3/40_repository_ec2_credentials/*',
'repository_s3/50_repository_ecs_credentials/*'
].join(",")
}
}
File parentFixtures = new File(project.buildDir, "fixtures")
@ -242,6 +220,7 @@ task s3Fixture(type: AntFixture) {
args 'org.elasticsearch.repositories.s3.AmazonS3Fixture', baseDir, s3FixtureFile.getAbsolutePath()
}
processTestResources {
Map<String, Object> expansions = [
'permanent_bucket': s3PermanentBucket,
'permanent_base_path': s3PermanentBasePath,
@ -252,72 +231,58 @@ Map<String, Object> expansions = [
'ecs_bucket': s3ECSBucket,
'ecs_base_path': s3ECSBasePath
]
processTestResources {
inputs.properties(expansions)
MavenFilteringHack.filter(it, expansions)
}
project.afterEvaluate {
if (useFixture == false) {
// temporary_credentials, ec2_credentials and ecs_credentials are not ready for third-party-tests yet
integTestRunner.systemProperty 'tests.rest.blacklist',
[
'repository_s3/30_repository_temporary_credentials/*',
'repository_s3/40_repository_ec2_credentials/*',
'repository_s3/50_repository_ecs_credentials/*'
].join(",")
}
integTest {
dependsOn s3Fixture
}
integTestCluster {
keystoreSetting 's3.client.integration_test_permanent.access_key', s3PermanentAccessKey
keystoreSetting 's3.client.integration_test_permanent.secret_key', s3PermanentSecretKey
testClusters.integTest {
keystore 's3.client.integration_test_permanent.access_key', s3PermanentAccessKey
keystore 's3.client.integration_test_permanent.secret_key', s3PermanentSecretKey
keystoreSetting 's3.client.integration_test_temporary.access_key', s3TemporaryAccessKey
keystoreSetting 's3.client.integration_test_temporary.secret_key', s3TemporarySecretKey
keystoreSetting 's3.client.integration_test_temporary.session_token', s3TemporarySessionToken
keystore 's3.client.integration_test_temporary.access_key', s3TemporaryAccessKey
keystore 's3.client.integration_test_temporary.secret_key', s3TemporarySecretKey
keystore 's3.client.integration_test_temporary.session_token', s3TemporarySessionToken
if (useFixture) {
dependsOn s3Fixture
/* Use a closure on the string to delay evaluation until tests are executed */
setting 's3.client.integration_test_permanent.endpoint', "http://${-> s3Fixture.addressAndPort}"
setting 's3.client.integration_test_temporary.endpoint', "http://${-> s3Fixture.addressAndPort}"
setting 's3.client.integration_test_ec2.endpoint', "http://${-> s3Fixture.addressAndPort}"
setting 's3.client.integration_test_permanent.endpoint', { "http://${s3Fixture.addressAndPort}" }
setting 's3.client.integration_test_temporary.endpoint', { "http://${s3Fixture.addressAndPort}" }
setting 's3.client.integration_test_ec2.endpoint', { "http://${s3Fixture.addressAndPort}" }
// to redirect InstanceProfileCredentialsProvider to custom auth point
systemProperty "com.amazonaws.sdk.ec2MetadataServiceEndpointOverride", "http://${-> s3Fixture.addressAndPort}"
systemProperty "com.amazonaws.sdk.ec2MetadataServiceEndpointOverride", { "http://${s3Fixture.addressAndPort}" }
} else {
println "Using an external service to test the repository-s3 plugin"
}
}
integTestRunner.systemProperty 'tests.rest.blacklist', 'repository_s3/50_repository_ecs_credentials/*'
if (useFixture) {
RestIntegTestTask integTestECS = project.tasks.create('integTestECS', RestIntegTestTask.class) {
description = "Runs tests using the ECS repository."
integTest.runner {
systemProperty 'tests.rest.blacklist', 'repository_s3/50_repository_ecs_credentials/*'
}
// The following closure must execute before the afterEvaluate block in the constructor of the following integrationTest tasks:
project.afterEvaluate {
ClusterConfiguration cluster = project.extensions.getByName('integTestECSCluster') as ClusterConfiguration
cluster.dependsOn(project.s3Fixture)
cluster.setting 's3.client.integration_test_ecs.endpoint', "http://${-> s3Fixture.addressAndPort}"
Task integTestECSTask = project.tasks.getByName('integTestECS')
integTestECSTask.clusterConfig.plugin(project.path)
integTestECSTask.clusterConfig.environment 'AWS_CONTAINER_CREDENTIALS_FULL_URI',
"http://${-> s3Fixture.addressAndPort}/ecs_credentials_endpoint"
integTestECSRunner.systemProperty 'tests.rest.blacklist', [
if (useFixture) {
task integTestECS(type: RestIntegTestTask.class) {
description = "Runs tests using the ECS repository."
dependsOn(project.s3Fixture)
runner {
systemProperty 'tests.rest.blacklist', [
'repository_s3/10_basic/*',
'repository_s3/20_repository_permanent_credentials/*',
'repository_s3/30_repository_temporary_credentials/*',
'repository_s3/40_repository_ec2_credentials/*'
].join(",")
}
project.check.dependsOn(integTestECS)
}
check.dependsOn(integTestECS)
testClusters.integTestECS {
setting 's3.client.integration_test_ecs.endpoint', { "http://${s3Fixture.addressAndPort}" }
plugin file(tasks.bundlePlugin.archiveFile)
environment 'AWS_CONTAINER_CREDENTIALS_FULL_URI', { "http://${s3Fixture.addressAndPort}/ecs_credentials_endpoint" }
}
}
thirdPartyAudit.ignoreMissingClasses (
@ -446,8 +411,3 @@ if (project.runtimeJavaVersion <= JavaVersion.VERSION_1_8) {
} else {
thirdPartyAudit.ignoreMissingClasses 'javax.activation.DataHandler'
}
// AWS SDK is exposing some deprecated methods which we call using a delegate:
// * setObjectRedirectLocation(String bucketName, String key, String newRedirectLocation)
// * changeObjectStorageClass(String bucketName, String key, StorageClass newStorageClass)
compileTestJava.options.compilerArgs << "-Xlint:-deprecation"