Switch testclusters to use nio (#37365)

This commit is contained in:
Alpar Torok 2019-01-24 17:30:24 +02:00 committed by GitHub
parent ddee1926f9
commit e7aa7e909a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -27,13 +27,14 @@ import org.gradle.internal.os.OperatingSystem;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.UncheckedIOException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
@ -45,6 +46,7 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
@ -56,8 +58,8 @@ public class ElasticsearchNode {
private final String name;
private final GradleServicesAdapter services;
private final AtomicBoolean configurationFrozen = new AtomicBoolean(false);
private final File artifactsExtractDir;
private final File workingDir;
private final Path artifactsExtractDir;
private final Path workingDir;
private static final int ES_DESTROY_TIMEOUT = 20;
private static final TimeUnit ES_DESTROY_TIMEOUT_UNIT = TimeUnit.SECONDS;
@ -65,6 +67,15 @@ public class ElasticsearchNode {
private static final TimeUnit NODE_UP_TIMEOUT_UNIT = TimeUnit.SECONDS;
private final LinkedHashMap<String, Predicate<ElasticsearchNode>> waitConditions;
private final Path confPathRepo;
private final Path configFile;
private final Path confPathData;
private final Path confPathLogs;
private final Path transportPortFile;
private final Path httpPortsFile;
private final Path esStdoutFile;
private final Path esStderrFile;
private Distribution distribution;
private String version;
private File javaHome;
@ -75,11 +86,19 @@ public class ElasticsearchNode {
this.path = path;
this.name = name;
this.services = services;
this.artifactsExtractDir = artifactsExtractDir;
this.workingDir = new File(workingDirBase, safeName(name));
this.artifactsExtractDir = artifactsExtractDir.toPath();
this.workingDir = workingDirBase.toPath().resolve(safeName(name)).toAbsolutePath();
confPathRepo = workingDir.resolve("repo");
configFile = workingDir.resolve("config/elasticsearch.yml");
confPathData = workingDir.resolve("data");
confPathLogs = workingDir.resolve("logs");
transportPortFile = confPathLogs.resolve("transport.ports");
httpPortsFile = confPathLogs.resolve("http.ports");
esStdoutFile = confPathLogs.resolve("es.stdout.log");
esStderrFile = confPathLogs.resolve("es.stderr.log");
this.waitConditions = new LinkedHashMap<>();
waitConditions.put("http ports file", node -> node.getHttpPortsFile().exists());
waitConditions.put("transport ports file", node -> node.getTransportPortFile().exists());
waitConditions.put("http ports file", node -> Files.exists(node.httpPortsFile));
waitConditions.put("transport ports file", node -> Files.exists(node.transportPortFile));
waitForUri("cluster health yellow", "/_cluster/health?wait_for_nodes=>=1&wait_for_status=yellow");
}
@ -149,39 +168,39 @@ public class ElasticsearchNode {
synchronized void start() {
logger.info("Starting `{}`", this);
File distroArtifact = new File(
new File(artifactsExtractDir, distribution.getFileExtension()),
distribution.getFileName() + "-" + getVersion()
);
if (distroArtifact.exists() == false) {
Path distroArtifact = artifactsExtractDir
.resolve(distribution.getFileExtension())
.resolve(distribution.getFileName() + "-" + getVersion());
if (Files.exists(distroArtifact) == false) {
throw new TestClustersException("Can not start " + this + ", missing: " + distroArtifact);
}
if (distroArtifact.isDirectory() == false) {
if (Files.isDirectory(distroArtifact) == false) {
throw new TestClustersException("Can not start " + this + ", is not a directory: " + distroArtifact);
}
services.sync(spec -> {
spec.from(new File(distroArtifact, "config"));
spec.into(getConfigFile().getParent());
spec.from(distroArtifact.resolve("config").toFile());
spec.into(configFile.getParent());
});
configure();
startElasticsearchProcess(distroArtifact);
}
private void startElasticsearchProcess(File distroArtifact) {
private void startElasticsearchProcess(Path distroArtifact) {
logger.info("Running `bin/elasticsearch` in `{}` for {}", workingDir, this);
final ProcessBuilder processBuilder = new ProcessBuilder();
if (OperatingSystem.current().isWindows()) {
processBuilder.command(
"cmd", "/c",
new File(distroArtifact, "\\bin\\elasticsearch.bat").getAbsolutePath()
distroArtifact.resolve("\\bin\\elasticsearch.bat").toAbsolutePath().toString()
);
} else {
processBuilder.command(
new File(distroArtifact.getAbsolutePath(), "bin/elasticsearch").getAbsolutePath()
distroArtifact.resolve("bin/elasticsearch").toAbsolutePath().toString()
);
}
try {
processBuilder.directory(workingDir);
processBuilder.directory(workingDir.toFile());
Map<String, String> environment = processBuilder.environment();
// Don't inherit anything from the environment for as that would lack reproductability
environment.clear();
@ -195,11 +214,11 @@ public class ElasticsearchNode {
} else {
logger.warn("{}: No javaHome configured, will rely on default java detection", this);
}
environment.put("ES_PATH_CONF", getConfigFile().getParentFile().getAbsolutePath());
environment.put("ES_PATH_CONF", configFile.getParent().toAbsolutePath().toString());
environment.put("ES_JAVA_OPTIONS", "-Xms512m -Xmx512m");
// don't buffer all in memory, make sure we don't block on the default pipes
processBuilder.redirectError(ProcessBuilder.Redirect.appendTo(getStdErrFile()));
processBuilder.redirectOutput(ProcessBuilder.Redirect.appendTo(getStdoutFile()));
processBuilder.redirectError(ProcessBuilder.Redirect.appendTo(esStderrFile.toFile()));
processBuilder.redirectOutput(ProcessBuilder.Redirect.appendTo(esStdoutFile.toFile()));
esProcess = processBuilder.start();
} catch (IOException e) {
throw new TestClustersException("Failed to start ES process for " + this, e);
@ -226,8 +245,8 @@ public class ElasticsearchNode {
requireNonNull(esProcess, "Can't stop `" + this + "` as it was not started or already stopped.");
stopHandle(esProcess.toHandle());
if (tailLogs) {
logFileContents("Standard output of node", getStdoutFile());
logFileContents("Standard error of node", getStdErrFile());
logFileContents("Standard output of node", esStdoutFile);
logFileContents("Standard error of node", esStderrFile);
}
esProcess = null;
}
@ -265,14 +284,14 @@ public class ElasticsearchNode {
);
}
private void logFileContents(String description, File from) {
private void logFileContents(String description, Path from) {
logger.error("{} `{}`", description, this);
try (BufferedReader reader = new BufferedReader(new FileReader(from))) {
reader.lines()
try(Stream<String> lines = Files.lines(from, StandardCharsets.UTF_8)) {
lines
.map(line -> " [" + name + "]" + line)
.forEach(logger::error);
} catch (IOException e) {
throw new TestClustersException("Error reading " + description, e);
throw new UncheckedIOException(e);
}
}
@ -289,48 +308,25 @@ public class ElasticsearchNode {
}
}
private File getConfigFile() {
return new File(workingDir, "config/elasticsearch.yml");
}
private File getConfPathData() {
return new File(workingDir, "data");
}
private File getConfPathSharedData() {
return new File(workingDir, "sharedData");
}
private File getConfPathRepo() {
return new File(workingDir, "repo");
}
private File getConfPathLogs() {
return new File(workingDir, "logs");
}
private File getStdoutFile() {
return new File(getConfPathLogs(), "es.stdout.log");
}
private File getStdErrFile() {
return new File(getConfPathLogs(), "es.stderr.log");
}
private void configure() {
getConfigFile().getParentFile().mkdirs();
getConfPathRepo().mkdirs();
getConfPathData().mkdirs();
getConfPathSharedData().mkdirs();
getConfPathLogs().mkdirs();
try {
Files.createDirectories(configFile.getParent());
Files.createDirectories(confPathRepo);
Files.createDirectories(confPathData);
Files.createDirectories(confPathLogs);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
LinkedHashMap<String, String> config = new LinkedHashMap<>();
String nodeName = safeName(name);
config.put("cluster.name",nodeName);
config.put("node.name", nodeName);
config.put("path.repo", getConfPathRepo().getAbsolutePath());
config.put("path.data", getConfPathData().getAbsolutePath());
config.put("path.logs", getConfPathLogs().getAbsolutePath());
config.put("path.shared_data", getConfPathSharedData().getAbsolutePath());
config.put("path.repo", confPathRepo.toAbsolutePath().toString());
config.put("path.data", confPathData.toAbsolutePath().toString());
config.put("path.logs", confPathLogs.toAbsolutePath().toString());
config.put("path.shared_data", workingDir.resolve("sharedData").toString());
config.put("node.attr.testattr", "test");
config.put("node.portsfile", "true");
config.put("http.port", "0");
@ -348,16 +344,16 @@ public class ElasticsearchNode {
}
try {
Files.write(
getConfigFile().toPath(),
configFile,
config.entrySet().stream()
.map(entry -> entry.getKey() + ": " + entry.getValue())
.collect(Collectors.joining("\n"))
.getBytes(StandardCharsets.UTF_8)
);
} catch (IOException e) {
throw new TestClustersException("Could not write config file: " + getConfigFile(), e);
throw new UncheckedIOException("Could not write config file: " + configFile, e);
}
logger.info("Written config file:{} for {}", getConfigFile(), this);
logger.info("Written config file:{} for {}", configFile, this);
}
private void checkFrozen() {
@ -372,41 +368,29 @@ public class ElasticsearchNode {
.replaceAll("[^a-zA-Z0-9]+", "-");
}
private File getHttpPortsFile() {
return new File(getConfPathLogs(), "http.ports");
}
private File getTransportPortFile() {
return new File(getConfPathLogs(), "transport.ports");
}
private List<String> getTransportPortInternal() {
File transportPortFile = getTransportPortFile();
try {
return readPortsFile(getTransportPortFile());
return readPortsFile(transportPortFile);
} catch (IOException e) {
throw new TestClustersException(
throw new UncheckedIOException(
"Failed to read transport ports file: " + transportPortFile + " for " + this, e
);
}
}
private List<String> getHttpPortInternal() {
File httpPortsFile = getHttpPortsFile();
try {
return readPortsFile(getHttpPortsFile());
return readPortsFile(httpPortsFile);
} catch (IOException e) {
throw new TestClustersException(
throw new UncheckedIOException(
"Failed to read http ports file: " + httpPortsFile + " for " + this, e
);
}
}
private List<String> readPortsFile(File file) throws IOException {
try (BufferedReader reader = new BufferedReader(new FileReader(file))) {
return reader.lines()
.map(String::trim)
.collect(Collectors.toList());
private List<String> readPortsFile(Path file) throws IOException {
try(Stream<String> lines = Files.lines(file, StandardCharsets.UTF_8)) {
return lines.map(String::trim).collect(Collectors.toList());
}
}