[ML] Add timeouts to named pipe connections (#63022)

This PR adds timeouts to the named pipe connections of the
autodetect, normalize and data_frame_analyzer processes.
This argument requires the changes of elastic/ml-cpp#1514 in
order to work, so that PR will be merged before this one.
(The controller process already had a different mechanism,
tied to the ES JVM lifetime.)

Backport of #62993
This commit is contained in:
David Roberts 2020-09-29 18:04:02 +01:00 committed by GitHub
parent 3bee28056f
commit 05427c2bb2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 63 additions and 84 deletions

View File

@ -13,7 +13,6 @@ import org.elasticsearch.xpack.ml.process.ProcessResultsParser;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Path; import java.nio.file.Path;
import java.time.Duration;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
@ -26,9 +25,9 @@ abstract class AbstractNativeAnalyticsProcess<Result> extends AbstractNativeProc
protected AbstractNativeAnalyticsProcess(String name, ConstructingObjectParser<Result, Void> resultParser, String jobId, protected AbstractNativeAnalyticsProcess(String name, ConstructingObjectParser<Result, Void> resultParser, String jobId,
ProcessPipes processPipes, int numberOfFields, ProcessPipes processPipes, int numberOfFields,
List<Path> filesToDelete, Consumer<String> onProcessCrash, Duration processConnectTimeout, List<Path> filesToDelete, Consumer<String> onProcessCrash,
NamedXContentRegistry namedXContentRegistry) { NamedXContentRegistry namedXContentRegistry) {
super(jobId, processPipes, numberOfFields, filesToDelete, onProcessCrash, processConnectTimeout); super(jobId, processPipes, numberOfFields, filesToDelete, onProcessCrash);
this.name = Objects.requireNonNull(name); this.name = Objects.requireNonNull(name);
this.resultsParser = new ProcessResultsParser<>(Objects.requireNonNull(resultParser), namedXContentRegistry); this.resultsParser = new ProcessResultsParser<>(Objects.requireNonNull(resultParser), namedXContentRegistry);
} }

View File

@ -21,7 +21,6 @@ import org.elasticsearch.xpack.ml.process.StateToProcessWriterHelper;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.nio.file.Path; import java.nio.file.Path;
import java.time.Duration;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.function.Consumer; import java.util.function.Consumer;
@ -36,10 +35,10 @@ public class NativeAnalyticsProcess extends AbstractNativeAnalyticsProcess<Analy
protected NativeAnalyticsProcess(String jobId, ProcessPipes processPipes, protected NativeAnalyticsProcess(String jobId, ProcessPipes processPipes,
int numberOfFields, List<Path> filesToDelete, int numberOfFields, List<Path> filesToDelete,
Consumer<String> onProcessCrash, Duration processConnectTimeout, AnalyticsProcessConfig config, Consumer<String> onProcessCrash, AnalyticsProcessConfig config,
NamedXContentRegistry namedXContentRegistry) { NamedXContentRegistry namedXContentRegistry) {
super(NAME, AnalyticsResult.PARSER, jobId, processPipes, numberOfFields, super(NAME, AnalyticsResult.PARSER, jobId, processPipes, numberOfFields,
filesToDelete, onProcessCrash, processConnectTimeout, namedXContentRegistry); filesToDelete, onProcessCrash, namedXContentRegistry);
this.config = Objects.requireNonNull(config); this.config = Objects.requireNonNull(config);
} }

View File

@ -72,7 +72,7 @@ public class NativeAnalyticsProcessFactory implements AnalyticsProcessFactory<An
Consumer<String> onProcessCrash) { Consumer<String> onProcessCrash) {
String jobId = config.getId(); String jobId = config.getId();
List<Path> filesToDelete = new ArrayList<>(); List<Path> filesToDelete = new ArrayList<>();
ProcessPipes processPipes = new ProcessPipes(env, NAMED_PIPE_HELPER, AnalyticsBuilder.ANALYTICS, jobId, ProcessPipes processPipes = new ProcessPipes(env, NAMED_PIPE_HELPER, processConnectTimeout, AnalyticsBuilder.ANALYTICS, jobId,
false, true, true, hasState, config.getAnalysis().persistsState()); false, true, true, hasState, config.getAnalysis().persistsState());
// The extra 2 are for the checksum and the control field // The extra 2 are for the checksum and the control field
@ -81,7 +81,7 @@ public class NativeAnalyticsProcessFactory implements AnalyticsProcessFactory<An
createNativeProcess(jobId, analyticsProcessConfig, filesToDelete, processPipes); createNativeProcess(jobId, analyticsProcessConfig, filesToDelete, processPipes);
NativeAnalyticsProcess analyticsProcess = new NativeAnalyticsProcess(jobId, processPipes, NativeAnalyticsProcess analyticsProcess = new NativeAnalyticsProcess(jobId, processPipes,
numberOfFields, filesToDelete, onProcessCrash, processConnectTimeout, numberOfFields, filesToDelete, onProcessCrash,
analyticsProcessConfig, namedXContentRegistry); analyticsProcessConfig, namedXContentRegistry);
try { try {

View File

@ -11,7 +11,6 @@ import org.elasticsearch.xpack.ml.dataframe.process.results.MemoryUsageEstimatio
import org.elasticsearch.xpack.ml.process.ProcessPipes; import org.elasticsearch.xpack.ml.process.ProcessPipes;
import java.nio.file.Path; import java.nio.file.Path;
import java.time.Duration;
import java.util.List; import java.util.List;
import java.util.function.Consumer; import java.util.function.Consumer;
@ -21,9 +20,9 @@ public class NativeMemoryUsageEstimationProcess extends AbstractNativeAnalyticsP
protected NativeMemoryUsageEstimationProcess(String jobId, ProcessPipes processPipes, protected NativeMemoryUsageEstimationProcess(String jobId, ProcessPipes processPipes,
int numberOfFields, List<Path> filesToDelete, int numberOfFields, List<Path> filesToDelete,
Consumer<String> onProcessCrash, Duration processConnectTimeout) { Consumer<String> onProcessCrash) {
super(NAME, MemoryUsageEstimationResult.PARSER, jobId, processPipes, super(NAME, MemoryUsageEstimationResult.PARSER, jobId, processPipes,
numberOfFields, filesToDelete, onProcessCrash, processConnectTimeout, NamedXContentRegistry.EMPTY); numberOfFields, filesToDelete, onProcessCrash, NamedXContentRegistry.EMPTY);
} }
@Override @Override

View File

@ -67,7 +67,7 @@ public class NativeMemoryUsageEstimationProcessFactory implements AnalyticsProce
// memory estimation process pipe names are unique. Therefore an increasing counter value is appended to the config ID // memory estimation process pipe names are unique. Therefore an increasing counter value is appended to the config ID
// to ensure uniqueness between calls. // to ensure uniqueness between calls.
ProcessPipes processPipes = new ProcessPipes( ProcessPipes processPipes = new ProcessPipes(
env, NAMED_PIPE_HELPER, AnalyticsBuilder.ANALYTICS, config.getId() + "_" + counter.incrementAndGet(), env, NAMED_PIPE_HELPER, processConnectTimeout, AnalyticsBuilder.ANALYTICS, config.getId() + "_" + counter.incrementAndGet(),
false, false, true, false, false); false, false, true, false, false);
createNativeProcess(config.getId(), analyticsProcessConfig, filesToDelete, processPipes); createNativeProcess(config.getId(), analyticsProcessConfig, filesToDelete, processPipes);
@ -77,8 +77,7 @@ public class NativeMemoryUsageEstimationProcessFactory implements AnalyticsProce
processPipes, processPipes,
0, 0,
filesToDelete, filesToDelete,
onProcessCrash, onProcessCrash);
processConnectTimeout);
try { try {
process.start(executorService); process.start(executorService);

View File

@ -27,7 +27,6 @@ import org.elasticsearch.xpack.ml.process.ProcessResultsParser;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.nio.file.Path; import java.nio.file.Path;
import java.time.Duration;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.function.Consumer; import java.util.function.Consumer;
@ -45,9 +44,8 @@ class NativeAutodetectProcess extends AbstractNativeProcess implements Autodetec
NativeAutodetectProcess(String jobId, ProcessPipes processPipes, NativeAutodetectProcess(String jobId, ProcessPipes processPipes,
int numberOfFields, List<Path> filesToDelete, int numberOfFields, List<Path> filesToDelete,
ProcessResultsParser<AutodetectResult> resultsParser, Consumer<String> onProcessCrash, ProcessResultsParser<AutodetectResult> resultsParser, Consumer<String> onProcessCrash) {
Duration processConnectTimeout) { super(jobId, processPipes, numberOfFields, filesToDelete, onProcessCrash);
super(jobId, processPipes, numberOfFields, filesToDelete, onProcessCrash, processConnectTimeout);
this.resultsParser = resultsParser; this.resultsParser = resultsParser;
} }

View File

@ -76,9 +76,9 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory
ExecutorService executorService, ExecutorService executorService,
Consumer<String> onProcessCrash) { Consumer<String> onProcessCrash) {
List<Path> filesToDelete = new ArrayList<>(); List<Path> filesToDelete = new ArrayList<>();
ProcessPipes processPipes = new ProcessPipes(env, NAMED_PIPE_HELPER, AutodetectBuilder.AUTODETECT, job.getId(), ProcessPipes processPipes = new ProcessPipes(env, NAMED_PIPE_HELPER, processConnectTimeout, AutodetectBuilder.AUTODETECT,
false, true, true, params.modelSnapshot() != null, job.getId(), false, true, true, params.modelSnapshot() != null,
!AutodetectBuilder.DONT_PERSIST_MODEL_STATE_SETTING.get(settings)); AutodetectBuilder.DONT_PERSIST_MODEL_STATE_SETTING.get(settings) == false);
createNativeProcess(job, params, processPipes, filesToDelete); createNativeProcess(job, params, processPipes, filesToDelete);
boolean includeTokensField = MachineLearning.CATEGORIZATION_TOKENIZATION_IN_JAVA boolean includeTokensField = MachineLearning.CATEGORIZATION_TOKENIZATION_IN_JAVA
&& job.getAnalysisConfig().getCategorizationFieldName() != null; && job.getAnalysisConfig().getCategorizationFieldName() != null;
@ -90,7 +90,7 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory
NamedXContentRegistry.EMPTY); NamedXContentRegistry.EMPTY);
NativeAutodetectProcess autodetect = new NativeAutodetectProcess( NativeAutodetectProcess autodetect = new NativeAutodetectProcess(
job.getId(), processPipes, numberOfFields, job.getId(), processPipes, numberOfFields,
filesToDelete, resultsParser, onProcessCrash, processConnectTimeout); filesToDelete, resultsParser, onProcessCrash);
try { try {
autodetect.start(executorService, stateProcessor); autodetect.start(executorService, stateProcessor);
return autodetect; return autodetect;

View File

@ -9,7 +9,6 @@ import org.elasticsearch.xpack.ml.job.process.normalizer.output.NormalizerResult
import org.elasticsearch.xpack.ml.process.AbstractNativeProcess; import org.elasticsearch.xpack.ml.process.AbstractNativeProcess;
import org.elasticsearch.xpack.ml.process.ProcessPipes; import org.elasticsearch.xpack.ml.process.ProcessPipes;
import java.time.Duration;
import java.util.Collections; import java.util.Collections;
/** /**
@ -19,8 +18,8 @@ class NativeNormalizerProcess extends AbstractNativeProcess implements Normalize
private static final String NAME = "normalizer"; private static final String NAME = "normalizer";
NativeNormalizerProcess(String jobId, ProcessPipes processPipes, Duration processConnectTimeout) { NativeNormalizerProcess(String jobId, ProcessPipes processPipes) {
super(jobId, processPipes, 0, Collections.emptyList(), (ignore) -> {}, processConnectTimeout); super(jobId, processPipes, 0, Collections.emptyList(), (ignore) -> {});
} }
@Override @Override

View File

@ -54,11 +54,11 @@ public class NativeNormalizerProcessFactory implements NormalizerProcessFactory
// The job ID passed to the process pipes is only used to make the file names unique. Since normalize can get run many times // The job ID passed to the process pipes is only used to make the file names unique. Since normalize can get run many times
// in quick succession for the same job the job ID alone is not sufficient to guarantee that the normalizer process pipe names // in quick succession for the same job the job ID alone is not sufficient to guarantee that the normalizer process pipe names
// are unique. Therefore an increasing counter value is appended to the job ID to ensure uniqueness between calls. // are unique. Therefore an increasing counter value is appended to the job ID to ensure uniqueness between calls.
ProcessPipes processPipes = new ProcessPipes(env, NAMED_PIPE_HELPER, NormalizerBuilder.NORMALIZE, ProcessPipes processPipes = new ProcessPipes(env, NAMED_PIPE_HELPER, processConnectTimeout, NormalizerBuilder.NORMALIZE,
jobId + "_" + counter.incrementAndGet(), false, true, true, false, false); jobId + "_" + counter.incrementAndGet(), false, true, true, false, false);
createNativeProcess(jobId, quantilesState, processPipes, bucketSpan); createNativeProcess(jobId, quantilesState, processPipes, bucketSpan);
NativeNormalizerProcess normalizerProcess = new NativeNormalizerProcess(jobId, processPipes, processConnectTimeout); NativeNormalizerProcess normalizerProcess = new NativeNormalizerProcess(jobId, processPipes);
try { try {
normalizerProcess.start(executorService); normalizerProcess.start(executorService);

View File

@ -57,7 +57,6 @@ public abstract class AbstractNativeProcess implements NativeProcess {
private final int numberOfFields; private final int numberOfFields;
private final List<Path> filesToDelete; private final List<Path> filesToDelete;
private final Consumer<String> onProcessCrash; private final Consumer<String> onProcessCrash;
private final Duration processConnectTimeout;
private volatile Future<?> logTailFuture; private volatile Future<?> logTailFuture;
private volatile Future<?> stateProcessorFuture; private volatile Future<?> stateProcessorFuture;
private volatile boolean processCloseInitiated; private volatile boolean processCloseInitiated;
@ -65,15 +64,13 @@ public abstract class AbstractNativeProcess implements NativeProcess {
private volatile boolean isReady; private volatile boolean isReady;
protected AbstractNativeProcess(String jobId, ProcessPipes processPipes, protected AbstractNativeProcess(String jobId, ProcessPipes processPipes,
int numberOfFields, List<Path> filesToDelete, Consumer<String> onProcessCrash, int numberOfFields, List<Path> filesToDelete, Consumer<String> onProcessCrash) {
Duration processConnectTimeout) {
this.jobId = jobId; this.jobId = jobId;
this.processPipes = processPipes; this.processPipes = processPipes;
this.startTime = ZonedDateTime.now(); this.startTime = ZonedDateTime.now();
this.numberOfFields = numberOfFields; this.numberOfFields = numberOfFields;
this.filesToDelete = filesToDelete; this.filesToDelete = filesToDelete;
this.onProcessCrash = Objects.requireNonNull(onProcessCrash); this.onProcessCrash = Objects.requireNonNull(onProcessCrash);
this.processConnectTimeout = Objects.requireNonNull(processConnectTimeout);
} }
public abstract String getName(); public abstract String getName();
@ -86,7 +83,7 @@ public abstract class AbstractNativeProcess implements NativeProcess {
*/ */
public void start(ExecutorService executorService) throws IOException { public void start(ExecutorService executorService) throws IOException {
processPipes.connectLogStream(processConnectTimeout); processPipes.connectLogStream();
cppLogHandler.set(processPipes.getLogStreamHandler()); cppLogHandler.set(processPipes.getLogStreamHandler());
logTailFuture = executorService.submit(() -> { logTailFuture = executorService.submit(() -> {
@ -101,7 +98,7 @@ public abstract class AbstractNativeProcess implements NativeProcess {
} }
}); });
processPipes.connectOtherStreams(processConnectTimeout); processPipes.connectOtherStreams();
if (processPipes.getProcessInStream().isPresent()) { if (processPipes.getProcessInStream().isPresent()) {
processInStream.set(new BufferedOutputStream(processPipes.getProcessInStream().get())); processInStream.set(new BufferedOutputStream(processPipes.getProcessInStream().get()));
this.recordWriter.set(new LengthEncodedWriter(processInStream.get())); this.recordWriter.set(new LengthEncodedWriter(processInStream.get()));
@ -220,7 +217,7 @@ public abstract class AbstractNativeProcess implements NativeProcess {
try { try {
// The PID comes via the processes log stream. We do wait here to give the process the time to start up and report its PID. // The PID comes via the processes log stream. We do wait here to give the process the time to start up and report its PID.
// Without the PID we cannot kill the process. // Without the PID we cannot kill the process.
NativeControllerHolder.getNativeController().killProcess(cppLogHandler().getPid(processConnectTimeout)); NativeControllerHolder.getNativeController().killProcess(cppLogHandler().getPid(processPipes.getTimeout()));
// Wait for the process to die before closing processInStream as if the process // Wait for the process to die before closing processInStream as if the process
// is still alive when processInStream is closed it may start persisting state // is still alive when processInStream is closed it may start persisting state

View File

@ -57,11 +57,11 @@ public class NativeController {
private final OutputStream commandStream; private final OutputStream commandStream;
NativeController(String localNodeName, Environment env, NamedPipeHelper namedPipeHelper) throws IOException { NativeController(String localNodeName, Environment env, NamedPipeHelper namedPipeHelper) throws IOException {
ProcessPipes processPipes = new ProcessPipes(env, namedPipeHelper, CONTROLLER, null, ProcessPipes processPipes = new ProcessPipes(env, namedPipeHelper, CONTROLLER_CONNECT_TIMEOUT, CONTROLLER, null,
true, false, false, false, false); true, false, false, false, false);
processPipes.connectLogStream(CONTROLLER_CONNECT_TIMEOUT); processPipes.connectLogStream();
tailLogsInThread(processPipes.getLogStreamHandler()); tailLogsInThread(processPipes.getLogStreamHandler());
processPipes.connectOtherStreams(CONTROLLER_CONNECT_TIMEOUT); processPipes.connectOtherStreams();
this.localNodeName = localNodeName; this.localNodeName = localNodeName;
this.cppLogHandler = processPipes.getLogStreamHandler(); this.cppLogHandler = processPipes.getLogStreamHandler();
this.commandStream = new BufferedOutputStream(processPipes.getCommandStream().get()); this.commandStream = new BufferedOutputStream(processPipes.getCommandStream().get());

View File

@ -36,6 +36,7 @@ public class ProcessPipes {
public static final String RESTORE_IS_PIPE_ARG = "--restoreIsPipe"; public static final String RESTORE_IS_PIPE_ARG = "--restoreIsPipe";
public static final String PERSIST_ARG = "--persist="; public static final String PERSIST_ARG = "--persist=";
public static final String PERSIST_IS_PIPE_ARG = "--persistIsPipe"; public static final String PERSIST_IS_PIPE_ARG = "--persistIsPipe";
public static final String TIMEOUT_ARG = "--namedPipeConnectTimeout=";
private final NamedPipeHelper namedPipeHelper; private final NamedPipeHelper namedPipeHelper;
private final String jobId; private final String jobId;
@ -50,6 +51,14 @@ public class ProcessPipes {
private final String restorePipeName; private final String restorePipeName;
private final String persistPipeName; private final String persistPipeName;
/**
* Needs to be long enough for the C++ process perform all startup tasks that precede creation of named
* pipes. There should not be very many of these, so a short timeout should be fine. However, at least
* five seconds is recommended due to the vagaries of process scheduling and the way VMs can completely
* stall for some hypervisor actions.
*/
private final Duration timeout;
private CppLogMessageHandler logStreamHandler; private CppLogMessageHandler logStreamHandler;
private OutputStream commandStream; private OutputStream commandStream;
private OutputStream processInStream; private OutputStream processInStream;
@ -66,11 +75,12 @@ public class ProcessPipes {
* @param jobId The job ID of the process to which pipes are to be opened, if the process is associated with a specific job. * @param jobId The job ID of the process to which pipes are to be opened, if the process is associated with a specific job.
* May be null or empty for processes not associated with a specific job. * May be null or empty for processes not associated with a specific job.
*/ */
public ProcessPipes(Environment env, NamedPipeHelper namedPipeHelper, String processName, String jobId, public ProcessPipes(Environment env, NamedPipeHelper namedPipeHelper, Duration timeout, String processName, String jobId,
boolean wantCommandPipe, boolean wantProcessInPipe, boolean wantProcessOutPipe, boolean wantCommandPipe, boolean wantProcessInPipe, boolean wantProcessOutPipe,
boolean wantRestorePipe, boolean wantPersistPipe) { boolean wantRestorePipe, boolean wantPersistPipe) {
this.namedPipeHelper = namedPipeHelper; this.namedPipeHelper = namedPipeHelper;
this.jobId = jobId; this.jobId = jobId;
this.timeout = timeout;
// The way the pipe names are formed MUST match what is done in the controller main() // The way the pipe names are formed MUST match what is done in the controller main()
// function, as it does not get any command line arguments when started as a daemon. If // function, as it does not get any command line arguments when started as a daemon. If
@ -116,6 +126,7 @@ public class ProcessPipes {
command.add(PERSIST_ARG + persistPipeName); command.add(PERSIST_ARG + persistPipeName);
command.add(PERSIST_IS_PIPE_ARG); command.add(PERSIST_IS_PIPE_ARG);
} }
command.add(TIMEOUT_ARG + timeout.getSeconds());
} }
/** /**
@ -123,24 +134,16 @@ public class ProcessPipes {
* started to read from it</em>so that there is no risk of messages logged in between creation of the other pipes on the C++ * started to read from it</em>so that there is no risk of messages logged in between creation of the other pipes on the C++
* side from blocking due to filling up the named pipe's buffer, and hence deadlocking communications between that process * side from blocking due to filling up the named pipe's buffer, and hence deadlocking communications between that process
* and this JVM. * and this JVM.
* @param timeout Needs to be long enough for the C++ process perform all startup tasks that precede creation of named pipes.
* There should not be very many of these, so a short timeout should be fine. However, at least a couple of
* seconds is recommended due to the vagaries of process scheduling and the way VMs can completely stall for
* some hypervisor actions.
*/ */
public void connectLogStream(Duration timeout) throws IOException { public void connectLogStream() throws IOException {
logStreamHandler = new CppLogMessageHandler(jobId, namedPipeHelper.openNamedPipeInputStream(logPipeName, timeout)); logStreamHandler = new CppLogMessageHandler(jobId, namedPipeHelper.openNamedPipeInputStream(logPipeName, timeout));
} }
/** /**
* Connect the other pipes created by the C++ process after the logging pipe has been connected. This must be called after * Connect the other pipes created by the C++ process after the logging pipe has been connected. This must be called after
* the corresponding C++ process has been started, and after {@link #connectLogStream}. * the corresponding C++ process has been started, and after {@link #connectLogStream}.
* @param timeout Needs to be long enough for the C++ process perform all startup tasks that precede creation of named pipes.
* There should not be very many of these, so a short timeout should be fine. However, at least a couple of
* seconds is recommended due to the vagaries of process scheduling and the way VMs can completely stall for
* some hypervisor actions.
*/ */
public void connectOtherStreams(Duration timeout) throws IOException { public void connectOtherStreams() throws IOException {
assert logStreamHandler != null : "Must connect log stream before other streams"; assert logStreamHandler != null : "Must connect log stream before other streams";
if (logStreamHandler == null) { if (logStreamHandler == null) {
throw new NullPointerException("Must connect log stream before other streams"); throw new NullPointerException("Must connect log stream before other streams");
@ -255,4 +258,8 @@ public class ProcessPipes {
} }
return Optional.of(persistStream); return Optional.of(persistStream);
} }
public Duration getTimeout() {
return timeout;
}
} }

View File

@ -40,10 +40,7 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
public class NativeAutodetectProcessTests extends ESTestCase { public class NativeAutodetectProcessTests extends ESTestCase {
@ -75,13 +72,14 @@ public class NativeAutodetectProcessTests extends ESTestCase {
when(processPipes.getProcessOutStream()).thenReturn(Optional.of(outputStream)); when(processPipes.getProcessOutStream()).thenReturn(Optional.of(outputStream));
when(processPipes.getRestoreStream()).thenReturn(Optional.of(restoreStream)); when(processPipes.getRestoreStream()).thenReturn(Optional.of(restoreStream));
when(processPipes.getPersistStream()).thenReturn(Optional.of(persistStream)); when(processPipes.getPersistStream()).thenReturn(Optional.of(persistStream));
when(processPipes.getTimeout()).thenReturn(Duration.ofSeconds(randomIntBetween(5, 100)));
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void testProcessStartTime() throws Exception { public void testProcessStartTime() throws Exception {
try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo",
processPipes, NUMBER_FIELDS, null, processPipes, NUMBER_FIELDS, null,
new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class), Duration.ZERO)) { new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class))) {
process.start(executorService, mock(IndexingStateProcessor.class)); process.start(executorService, mock(IndexingStateProcessor.class));
ZonedDateTime startTime = process.getProcessStartTime(); ZonedDateTime startTime = process.getProcessStartTime();
@ -99,7 +97,7 @@ public class NativeAutodetectProcessTests extends ESTestCase {
String[] record = {"r1", "r2", "r3", "r4", "r5"}; String[] record = {"r1", "r2", "r3", "r4", "r5"};
try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo",
processPipes, NUMBER_FIELDS, Collections.emptyList(), processPipes, NUMBER_FIELDS, Collections.emptyList(),
new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class), Duration.ZERO)) { new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class))) {
process.start(executorService, mock(IndexingStateProcessor.class)); process.start(executorService, mock(IndexingStateProcessor.class));
process.writeRecord(record); process.writeRecord(record);
@ -129,7 +127,7 @@ public class NativeAutodetectProcessTests extends ESTestCase {
public void testFlush() throws IOException { public void testFlush() throws IOException {
try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo",
processPipes, NUMBER_FIELDS, Collections.emptyList(), processPipes, NUMBER_FIELDS, Collections.emptyList(),
new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class), Duration.ZERO)) { new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class))) {
process.start(executorService, mock(IndexingStateProcessor.class)); process.start(executorService, mock(IndexingStateProcessor.class));
FlushJobParams params = FlushJobParams.builder().build(); FlushJobParams params = FlushJobParams.builder().build();
@ -158,8 +156,7 @@ public class NativeAutodetectProcessTests extends ESTestCase {
try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo",
processPipes, NUMBER_FIELDS, Collections.emptyList(), processPipes, NUMBER_FIELDS, Collections.emptyList(),
new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class), new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class))) {
Duration.ZERO)) {
process.start(executorService); process.start(executorService);
process.consumeAndCloseOutputStream(); process.consumeAndCloseOutputStream();
@ -167,28 +164,11 @@ public class NativeAutodetectProcessTests extends ESTestCase {
} }
} }
@SuppressWarnings("unchecked")
public void testPipeConnectTimeout() throws IOException {
int timeoutSeconds = randomIntBetween(5, 100);
try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo",
processPipes, NUMBER_FIELDS, Collections.emptyList(),
new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class),
Duration.ofSeconds(timeoutSeconds))) {
process.start(executorService);
}
verify(processPipes, times(1)).connectLogStream(eq(Duration.ofSeconds(timeoutSeconds)));
verify(processPipes, times(1)).connectOtherStreams(eq(Duration.ofSeconds(timeoutSeconds)));
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private void testWriteMessage(CheckedConsumer<NativeAutodetectProcess> writeFunction, String expectedMessageCode) throws IOException { private void testWriteMessage(CheckedConsumer<NativeAutodetectProcess> writeFunction, String expectedMessageCode) throws IOException {
try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo",
processPipes, NUMBER_FIELDS, Collections.emptyList(), processPipes, NUMBER_FIELDS, Collections.emptyList(),
new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class), Duration.ZERO)) { new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class))) {
process.start(executorService, mock(IndexingStateProcessor.class)); process.start(executorService, mock(IndexingStateProcessor.class));
writeFunction.accept(process); writeFunction.accept(process);

View File

@ -187,7 +187,7 @@ public class AbstractNativeProcessTests extends ESTestCase {
private class TestNativeProcess extends AbstractNativeProcess { private class TestNativeProcess extends AbstractNativeProcess {
TestNativeProcess() { TestNativeProcess() {
super("foo", processPipes, 0, null, onProcessCrash, Duration.ZERO); super("foo", processPipes, 0, null, onProcessCrash);
} }
@Override @Override

View File

@ -60,12 +60,13 @@ public class ProcessPipesTests extends ESTestCase {
when(namedPipeHelper.openNamedPipeInputStream(contains("persist"), any(Duration.class))) when(namedPipeHelper.openNamedPipeInputStream(contains("persist"), any(Duration.class)))
.thenReturn(new ByteArrayInputStream(PERSIST_BYTES)); .thenReturn(new ByteArrayInputStream(PERSIST_BYTES));
ProcessPipes processPipes = new ProcessPipes(env, namedPipeHelper, AutodetectBuilder.AUTODETECT, "my_job", int timeoutSeconds = randomIntBetween(5, 100);
false, true, true, true, true); ProcessPipes processPipes = new ProcessPipes(env, namedPipeHelper, Duration.ofSeconds(timeoutSeconds), AutodetectBuilder.AUTODETECT,
"my_job", false, true, true, true, true);
List<String> command = new ArrayList<>(); List<String> command = new ArrayList<>();
processPipes.addArgs(command); processPipes.addArgs(command);
assertEquals(9, command.size()); assertEquals(10, command.size());
assertEquals(ProcessPipes.LOG_PIPE_ARG, command.get(0).substring(0, ProcessPipes.LOG_PIPE_ARG.length())); assertEquals(ProcessPipes.LOG_PIPE_ARG, command.get(0).substring(0, ProcessPipes.LOG_PIPE_ARG.length()));
assertEquals(ProcessPipes.INPUT_ARG, command.get(1).substring(0, ProcessPipes.INPUT_ARG.length())); assertEquals(ProcessPipes.INPUT_ARG, command.get(1).substring(0, ProcessPipes.INPUT_ARG.length()));
assertEquals(ProcessPipes.INPUT_IS_PIPE_ARG, command.get(2)); assertEquals(ProcessPipes.INPUT_IS_PIPE_ARG, command.get(2));
@ -75,15 +76,16 @@ public class ProcessPipesTests extends ESTestCase {
assertEquals(ProcessPipes.RESTORE_IS_PIPE_ARG, command.get(6)); assertEquals(ProcessPipes.RESTORE_IS_PIPE_ARG, command.get(6));
assertEquals(ProcessPipes.PERSIST_ARG, command.get(7).substring(0, ProcessPipes.PERSIST_ARG.length())); assertEquals(ProcessPipes.PERSIST_ARG, command.get(7).substring(0, ProcessPipes.PERSIST_ARG.length()));
assertEquals(ProcessPipes.PERSIST_IS_PIPE_ARG, command.get(8)); assertEquals(ProcessPipes.PERSIST_IS_PIPE_ARG, command.get(8));
assertEquals(ProcessPipes.TIMEOUT_ARG + timeoutSeconds, command.get(9));
processPipes.connectLogStream(Duration.ofSeconds(2)); processPipes.connectLogStream();
CppLogMessageHandler logMessageHandler = processPipes.getLogStreamHandler(); CppLogMessageHandler logMessageHandler = processPipes.getLogStreamHandler();
assertNotNull(logMessageHandler); assertNotNull(logMessageHandler);
logMessageHandler.tailStream(); logMessageHandler.tailStream();
assertEquals(42, logMessageHandler.getPid(Duration.ZERO)); assertEquals(42, logMessageHandler.getPid(Duration.ZERO));
processPipes.connectOtherStreams(Duration.ofSeconds(2)); processPipes.connectOtherStreams();
assertFalse(processPipes.getCommandStream().isPresent()); assertFalse(processPipes.getCommandStream().isPresent());
assertTrue(processPipes.getProcessInStream().isPresent()); assertTrue(processPipes.getProcessInStream().isPresent());
@ -108,7 +110,7 @@ public class ProcessPipesTests extends ESTestCase {
Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build(); Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build();
Environment env = TestEnvironment.newEnvironment(settings); Environment env = TestEnvironment.newEnvironment(settings);
new ProcessPipes(env, namedPipeHelper, AutodetectBuilder.AUTODETECT, "my_job", new ProcessPipes(env, namedPipeHelper, Duration.ofSeconds(2), AutodetectBuilder.AUTODETECT, "my_job",
true, true, true, true, true); true, true, true, true, true);
} }
@ -135,11 +137,11 @@ public class ProcessPipesTests extends ESTestCase {
Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build(); Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build();
Environment env = TestEnvironment.newEnvironment(settings); Environment env = TestEnvironment.newEnvironment(settings);
ProcessPipes processPipes = new ProcessPipes(env, namedPipeHelper, AutodetectBuilder.AUTODETECT, "my_job", ProcessPipes processPipes = new ProcessPipes(env, namedPipeHelper, Duration.ofSeconds(2), AutodetectBuilder.AUTODETECT, "my_job",
true, true, true, true, true); true, true, true, true, true);
processPipes.connectLogStream(Duration.ofSeconds(2)); processPipes.connectLogStream();
expectThrows(IOException.class, () -> processPipes.connectOtherStreams(Duration.ofSeconds(2))); expectThrows(IOException.class, processPipes::connectOtherStreams);
// check the pipes successfully opened were then closed // check the pipes successfully opened were then closed
verify(logStream, times(1)).close(); verify(logStream, times(1)).close();