Pass processConnectTimeout to the method that fetches C++ process' PID (#50276) (#50290)

This commit is contained in:
Przemysław Witek 2019-12-17 21:32:37 +01:00 committed by GitHub
parent 30d66828ae
commit ac974c35c0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 42 additions and 28 deletions

View File

@ -14,6 +14,7 @@ import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.nio.file.Path; import java.nio.file.Path;
import java.time.Duration;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
@ -27,9 +28,10 @@ abstract class AbstractNativeAnalyticsProcess<Result> extends AbstractNativeProc
protected AbstractNativeAnalyticsProcess(String name, ConstructingObjectParser<Result, Void> resultParser, String jobId, protected AbstractNativeAnalyticsProcess(String name, ConstructingObjectParser<Result, Void> resultParser, String jobId,
InputStream logStream, OutputStream processInStream, InputStream logStream, OutputStream processInStream,
InputStream processOutStream, OutputStream processRestoreStream, int numberOfFields, InputStream processOutStream, OutputStream processRestoreStream, int numberOfFields,
List<Path> filesToDelete, Consumer<String> onProcessCrash, List<Path> filesToDelete, Consumer<String> onProcessCrash, Duration processConnectTimeout,
NamedXContentRegistry namedXContentRegistry) { NamedXContentRegistry namedXContentRegistry) {
super(jobId, logStream, processInStream, processOutStream, processRestoreStream, numberOfFields, filesToDelete, onProcessCrash); super(jobId, logStream, processInStream, processOutStream, processRestoreStream, numberOfFields, filesToDelete, onProcessCrash,
processConnectTimeout);
this.name = Objects.requireNonNull(name); this.name = Objects.requireNonNull(name);
this.resultsParser = new ProcessResultsParser<>(Objects.requireNonNull(resultParser), namedXContentRegistry); this.resultsParser = new ProcessResultsParser<>(Objects.requireNonNull(resultParser), namedXContentRegistry);
} }

View File

@ -14,6 +14,7 @@ import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.nio.file.Path; import java.nio.file.Path;
import java.time.Duration;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.function.Consumer; import java.util.function.Consumer;
@ -26,10 +27,10 @@ public class NativeAnalyticsProcess extends AbstractNativeAnalyticsProcess<Analy
protected NativeAnalyticsProcess(String jobId, InputStream logStream, OutputStream processInStream, InputStream processOutStream, protected NativeAnalyticsProcess(String jobId, InputStream logStream, OutputStream processInStream, InputStream processOutStream,
OutputStream processRestoreStream, int numberOfFields, List<Path> filesToDelete, OutputStream processRestoreStream, int numberOfFields, List<Path> filesToDelete,
Consumer<String> onProcessCrash, AnalyticsProcessConfig config, Consumer<String> onProcessCrash, Duration processConnectTimeout, AnalyticsProcessConfig config,
NamedXContentRegistry namedXContentRegistry) { NamedXContentRegistry namedXContentRegistry) {
super(NAME, AnalyticsResult.PARSER, jobId, logStream, processInStream, processOutStream, processRestoreStream, numberOfFields, super(NAME, AnalyticsResult.PARSER, jobId, logStream, processInStream, processOutStream, processRestoreStream, numberOfFields,
filesToDelete, onProcessCrash, namedXContentRegistry); filesToDelete, onProcessCrash, processConnectTimeout, namedXContentRegistry);
this.config = Objects.requireNonNull(config); this.config = Objects.requireNonNull(config);
} }

View File

@ -84,8 +84,8 @@ public class NativeAnalyticsProcessFactory implements AnalyticsProcessFactory<An
NativeAnalyticsProcess analyticsProcess = new NativeAnalyticsProcess(jobId, processPipes.getLogStream().get(), NativeAnalyticsProcess analyticsProcess = new NativeAnalyticsProcess(jobId, processPipes.getLogStream().get(),
processPipes.getProcessInStream().get(), processPipes.getProcessOutStream().get(), processPipes.getProcessInStream().get(), processPipes.getProcessOutStream().get(),
processPipes.getRestoreStream().orElse(null), numberOfFields, filesToDelete, onProcessCrash, analyticsProcessConfig, processPipes.getRestoreStream().orElse(null), numberOfFields, filesToDelete, onProcessCrash, processConnectTimeout,
namedXContentRegistry); analyticsProcessConfig, namedXContentRegistry);
try { try {
startProcess(config, executorService, processPipes, analyticsProcess); startProcess(config, executorService, processPipes, analyticsProcess);

View File

@ -12,6 +12,7 @@ import org.elasticsearch.xpack.ml.dataframe.process.results.MemoryUsageEstimatio
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.nio.file.Path; import java.nio.file.Path;
import java.time.Duration;
import java.util.List; import java.util.List;
import java.util.function.Consumer; import java.util.function.Consumer;
@ -22,9 +23,9 @@ public class NativeMemoryUsageEstimationProcess extends AbstractNativeAnalyticsP
protected NativeMemoryUsageEstimationProcess(String jobId, InputStream logStream, protected NativeMemoryUsageEstimationProcess(String jobId, InputStream logStream,
OutputStream processInStream, InputStream processOutStream, OutputStream processInStream, InputStream processOutStream,
OutputStream processRestoreStream, int numberOfFields, List<Path> filesToDelete, OutputStream processRestoreStream, int numberOfFields, List<Path> filesToDelete,
Consumer<String> onProcessCrash) { Consumer<String> onProcessCrash, Duration processConnectTimeout) {
super(NAME, MemoryUsageEstimationResult.PARSER, jobId, logStream, processInStream, processOutStream, processRestoreStream, super(NAME, MemoryUsageEstimationResult.PARSER, jobId, logStream, processInStream, processOutStream, processRestoreStream,
numberOfFields, filesToDelete, onProcessCrash, NamedXContentRegistry.EMPTY); numberOfFields, filesToDelete, onProcessCrash, processConnectTimeout, NamedXContentRegistry.EMPTY);
} }
@Override @Override

View File

@ -75,7 +75,8 @@ public class NativeMemoryUsageEstimationProcessFactory implements AnalyticsProce
null, null,
0, 0,
filesToDelete, filesToDelete,
onProcessCrash); onProcessCrash,
processConnectTimeout);
try { try {
process.start(executorService); process.start(executorService);

View File

@ -26,6 +26,7 @@ import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.nio.file.Path; import java.nio.file.Path;
import java.time.Duration;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.function.Consumer; import java.util.function.Consumer;
@ -43,8 +44,10 @@ class NativeAutodetectProcess extends AbstractNativeProcess implements Autodetec
NativeAutodetectProcess(String jobId, InputStream logStream, OutputStream processInStream, InputStream processOutStream, NativeAutodetectProcess(String jobId, InputStream logStream, OutputStream processInStream, InputStream processOutStream,
OutputStream processRestoreStream, int numberOfFields, List<Path> filesToDelete, OutputStream processRestoreStream, int numberOfFields, List<Path> filesToDelete,
ProcessResultsParser<AutodetectResult> resultsParser, Consumer<String> onProcessCrash) { ProcessResultsParser<AutodetectResult> resultsParser, Consumer<String> onProcessCrash,
super(jobId, logStream, processInStream, processOutStream, processRestoreStream, numberOfFields, filesToDelete, onProcessCrash); Duration processConnectTimeout) {
super(jobId, logStream, processInStream, processOutStream, processRestoreStream, numberOfFields, filesToDelete, onProcessCrash,
processConnectTimeout);
this.resultsParser = resultsParser; this.resultsParser = resultsParser;
} }

View File

@ -91,7 +91,7 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory
NativeAutodetectProcess autodetect = new NativeAutodetectProcess( NativeAutodetectProcess autodetect = new NativeAutodetectProcess(
job.getId(), processPipes.getLogStream().get(), processPipes.getProcessInStream().get(), job.getId(), processPipes.getLogStream().get(), processPipes.getProcessInStream().get(),
processPipes.getProcessOutStream().get(), processPipes.getRestoreStream().orElse(null), numberOfFields, processPipes.getProcessOutStream().get(), processPipes.getRestoreStream().orElse(null), numberOfFields,
filesToDelete, resultsParser, onProcessCrash); filesToDelete, resultsParser, onProcessCrash, processConnectTimeout);
try { try {
autodetect.start(executorService, stateProcessor, processPipes.getPersistStream().get()); autodetect.start(executorService, stateProcessor, processPipes.getPersistStream().get());
return autodetect; return autodetect;

View File

@ -10,6 +10,7 @@ import org.elasticsearch.xpack.ml.process.AbstractNativeProcess;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.time.Duration;
import java.util.Collections; import java.util.Collections;
/** /**
@ -19,8 +20,9 @@ class NativeNormalizerProcess extends AbstractNativeProcess implements Normalize
private static final String NAME = "normalizer"; private static final String NAME = "normalizer";
NativeNormalizerProcess(String jobId, InputStream logStream, OutputStream processInStream, InputStream processOutStream) { NativeNormalizerProcess(String jobId, InputStream logStream, OutputStream processInStream, InputStream processOutStream,
super(jobId, logStream, processInStream, processOutStream, null, 0, Collections.emptyList(), (ignore) -> {}); Duration processConnectTimeout) {
super(jobId, logStream, processInStream, processOutStream, null, 0, Collections.emptyList(), (ignore) -> {}, processConnectTimeout);
} }
@Override @Override

View File

@ -53,7 +53,7 @@ public class NativeNormalizerProcessFactory implements NormalizerProcessFactory
createNativeProcess(jobId, quantilesState, processPipes, bucketSpan); createNativeProcess(jobId, quantilesState, processPipes, bucketSpan);
NativeNormalizerProcess normalizerProcess = new NativeNormalizerProcess(jobId, processPipes.getLogStream().get(), NativeNormalizerProcess normalizerProcess = new NativeNormalizerProcess(jobId, processPipes.getLogStream().get(),
processPipes.getProcessInStream().get(), processPipes.getProcessOutStream().get()); processPipes.getProcessInStream().get(), processPipes.getProcessOutStream().get(), processConnectTimeout);
try { try {
normalizerProcess.start(executorService); normalizerProcess.start(executorService);

View File

@ -51,6 +51,7 @@ public abstract class AbstractNativeProcess implements NativeProcess {
private final int numberOfFields; private final int numberOfFields;
private final List<Path> filesToDelete; private final List<Path> filesToDelete;
private final Consumer<String> onProcessCrash; private final Consumer<String> onProcessCrash;
private final Duration processConnectTimeout;
private volatile Future<?> logTailFuture; private volatile Future<?> logTailFuture;
private volatile Future<?> stateProcessorFuture; private volatile Future<?> stateProcessorFuture;
private volatile boolean processCloseInitiated; private volatile boolean processCloseInitiated;
@ -59,17 +60,18 @@ public abstract class AbstractNativeProcess implements NativeProcess {
protected AbstractNativeProcess(String jobId, InputStream logStream, OutputStream processInStream, InputStream processOutStream, protected AbstractNativeProcess(String jobId, InputStream logStream, OutputStream processInStream, InputStream processOutStream,
OutputStream processRestoreStream, int numberOfFields, List<Path> filesToDelete, OutputStream processRestoreStream, int numberOfFields, List<Path> filesToDelete,
Consumer<String> onProcessCrash) { Consumer<String> onProcessCrash, Duration processConnectTimeout) {
this.jobId = jobId; this.jobId = jobId;
cppLogHandler = new CppLogMessageHandler(jobId, logStream); this.cppLogHandler = new CppLogMessageHandler(jobId, logStream);
this.processInStream = processInStream != null ? new BufferedOutputStream(processInStream) : null; this.processInStream = processInStream != null ? new BufferedOutputStream(processInStream) : null;
this.processOutStream = processOutStream; this.processOutStream = processOutStream;
this.processRestoreStream = processRestoreStream; this.processRestoreStream = processRestoreStream;
this.recordWriter = new LengthEncodedWriter(this.processInStream); this.recordWriter = new LengthEncodedWriter(this.processInStream);
startTime = ZonedDateTime.now(); this.startTime = ZonedDateTime.now();
this.numberOfFields = numberOfFields; this.numberOfFields = numberOfFields;
this.filesToDelete = filesToDelete; this.filesToDelete = filesToDelete;
this.onProcessCrash = Objects.requireNonNull(onProcessCrash); this.onProcessCrash = Objects.requireNonNull(onProcessCrash);
this.processConnectTimeout = Objects.requireNonNull(processConnectTimeout);
} }
public abstract String getName(); public abstract String getName();
@ -195,10 +197,9 @@ public abstract class AbstractNativeProcess implements NativeProcess {
LOGGER.debug("[{}] Killing {} process", jobId, getName()); LOGGER.debug("[{}] Killing {} process", jobId, getName());
processKilled = true; processKilled = true;
try { try {
// The PID comes via the processes log stream. We don't wait for it to arrive here, // The PID comes via the processes log stream. We do wait here to give the process the time to start up and report its PID.
// but if the wait times out it implies the process has only just started, in which // Without the PID we cannot kill the process.
// case it should die very quickly when we close its input stream. NativeControllerHolder.getNativeController().killProcess(cppLogHandler.getPid(processConnectTimeout));
NativeControllerHolder.getNativeController().killProcess(cppLogHandler.getPid(Duration.ZERO));
// Wait for the process to die before closing processInStream as if the process // Wait for the process to die before closing processInStream as if the process
// is still alive when processInStream is closed it may start persisting state // is still alive when processInStream is closed it may start persisting state

View File

@ -25,6 +25,7 @@ import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.ZonedDateTime; import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit; import java.time.temporal.ChronoUnit;
import java.util.Collections; import java.util.Collections;
@ -62,7 +63,7 @@ public class NativeAutodetectProcessTests extends ESTestCase {
try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream, try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream,
mock(OutputStream.class), outputStream, mock(OutputStream.class), mock(OutputStream.class), outputStream, mock(OutputStream.class),
NUMBER_FIELDS, null, NUMBER_FIELDS, null,
new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class))) { new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class), Duration.ZERO)) {
process.start(executorService, mock(IndexingStateProcessor.class), mock(InputStream.class)); process.start(executorService, mock(IndexingStateProcessor.class), mock(InputStream.class));
ZonedDateTime startTime = process.getProcessStartTime(); ZonedDateTime startTime = process.getProcessStartTime();
@ -85,7 +86,7 @@ public class NativeAutodetectProcessTests extends ESTestCase {
ByteArrayOutputStream bos = new ByteArrayOutputStream(1024); ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream, try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream,
bos, outputStream, mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(), bos, outputStream, mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(),
new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class))) { new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class), Duration.ZERO)) {
process.start(executorService, mock(IndexingStateProcessor.class), mock(InputStream.class)); process.start(executorService, mock(IndexingStateProcessor.class), mock(InputStream.class));
process.writeRecord(record); process.writeRecord(record);
@ -120,7 +121,7 @@ public class NativeAutodetectProcessTests extends ESTestCase {
ByteArrayOutputStream bos = new ByteArrayOutputStream(AutodetectControlMsgWriter.FLUSH_SPACES_LENGTH + 1024); ByteArrayOutputStream bos = new ByteArrayOutputStream(AutodetectControlMsgWriter.FLUSH_SPACES_LENGTH + 1024);
try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream, try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream,
bos, outputStream, mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(), bos, outputStream, mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(),
new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class))) { new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class), Duration.ZERO)) {
process.start(executorService, mock(IndexingStateProcessor.class), mock(InputStream.class)); process.start(executorService, mock(IndexingStateProcessor.class), mock(InputStream.class));
FlushJobParams params = FlushJobParams.builder().build(); FlushJobParams params = FlushJobParams.builder().build();
@ -154,7 +155,8 @@ public class NativeAutodetectProcessTests extends ESTestCase {
try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream, try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream,
processInStream, processOutStream, mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(), processInStream, processOutStream, mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(),
new ProcessResultsParser<AutodetectResult>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class))) { new ProcessResultsParser<AutodetectResult>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class),
Duration.ZERO)) {
process.consumeAndCloseOutputStream(); process.consumeAndCloseOutputStream();
assertThat(processOutStream.available(), equalTo(0)); assertThat(processOutStream.available(), equalTo(0));
@ -170,7 +172,7 @@ public class NativeAutodetectProcessTests extends ESTestCase {
ByteArrayOutputStream bos = new ByteArrayOutputStream(1024); ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream, try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream,
bos, outputStream, mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(), bos, outputStream, mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(),
new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class))) { new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class), Duration.ZERO)) {
process.start(executorService, mock(IndexingStateProcessor.class), mock(InputStream.class)); process.start(executorService, mock(IndexingStateProcessor.class), mock(InputStream.class));
writeFunction.accept(process); writeFunction.accept(process);

View File

@ -16,6 +16,7 @@ import org.junit.Before;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.time.Duration;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -140,7 +141,7 @@ public class AbstractNativeProcessTests extends ESTestCase {
private class TestNativeProcess extends AbstractNativeProcess { private class TestNativeProcess extends AbstractNativeProcess {
TestNativeProcess(OutputStream inputStream) { TestNativeProcess(OutputStream inputStream) {
super("foo", logStream, inputStream, outputStream, restoreStream, 0, null, onProcessCrash); super("foo", logStream, inputStream, outputStream, restoreStream, 0, null, onProcessCrash, Duration.ZERO);
} }
@Override @Override