mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-17 10:25:15 +00:00
[ML] Tail the C++ logging pipe before connecting other pipes (#56701)
Prior to this change the named pipes that connect the ML C++ processes to the Elasticsearch JVM were all opened before any of them were read from or written to. This created a problem, where if the C++ process logged more messages between opening the log pipe and opening the last pipe to be connected than there was space for in the named pipe's buffer then the C++ process would block. This would mean it never got as far as opening the last named pipe, so the JVM would never get as far as reading from the log pipe, hence a deadlock. This change alters the connection order so that the JVM starts reading from the logging pipe immediately after opening it so that if the C++ process logs messages while opening the other named pipes they are captured in a timely manner and there is no danger of a deadlock. Backport of #56632
This commit is contained in:
parent
ac432f6612
commit
3051c37f92
@ -8,11 +8,10 @@ package org.elasticsearch.xpack.ml.dataframe.process;
|
||||
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.xpack.ml.process.AbstractNativeProcess;
|
||||
import org.elasticsearch.xpack.ml.process.ProcessPipes;
|
||||
import org.elasticsearch.xpack.ml.process.ProcessResultsParser;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.file.Path;
|
||||
import java.time.Duration;
|
||||
import java.util.Iterator;
|
||||
@ -26,12 +25,10 @@ abstract class AbstractNativeAnalyticsProcess<Result> extends AbstractNativeProc
|
||||
private final ProcessResultsParser<Result> resultsParser;
|
||||
|
||||
protected AbstractNativeAnalyticsProcess(String name, ConstructingObjectParser<Result, Void> resultParser, String jobId,
|
||||
InputStream logStream, OutputStream processInStream,
|
||||
InputStream processOutStream, OutputStream processRestoreStream, int numberOfFields,
|
||||
ProcessPipes processPipes, int numberOfFields,
|
||||
List<Path> filesToDelete, Consumer<String> onProcessCrash, Duration processConnectTimeout,
|
||||
NamedXContentRegistry namedXContentRegistry) {
|
||||
super(jobId, logStream, processInStream, processOutStream, processRestoreStream, numberOfFields, filesToDelete, onProcessCrash,
|
||||
processConnectTimeout);
|
||||
super(jobId, processPipes, numberOfFields, filesToDelete, onProcessCrash, processConnectTimeout);
|
||||
this.name = Objects.requireNonNull(name);
|
||||
this.resultsParser = new ProcessResultsParser<>(Objects.requireNonNull(resultParser), namedXContentRegistry);
|
||||
}
|
||||
|
@ -8,10 +8,10 @@ package org.elasticsearch.xpack.ml.dataframe.process;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.xpack.ml.dataframe.process.results.AnalyticsResult;
|
||||
import org.elasticsearch.xpack.ml.process.ProcessPipes;
|
||||
import org.elasticsearch.xpack.ml.process.StateToProcessWriterHelper;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.file.Path;
|
||||
import java.time.Duration;
|
||||
@ -25,11 +25,11 @@ public class NativeAnalyticsProcess extends AbstractNativeAnalyticsProcess<Analy
|
||||
|
||||
private final AnalyticsProcessConfig config;
|
||||
|
||||
protected NativeAnalyticsProcess(String jobId, InputStream logStream, OutputStream processInStream, InputStream processOutStream,
|
||||
OutputStream processRestoreStream, int numberOfFields, List<Path> filesToDelete,
|
||||
protected NativeAnalyticsProcess(String jobId, ProcessPipes processPipes,
|
||||
int numberOfFields, List<Path> filesToDelete,
|
||||
Consumer<String> onProcessCrash, Duration processConnectTimeout, AnalyticsProcessConfig config,
|
||||
NamedXContentRegistry namedXContentRegistry) {
|
||||
super(NAME, AnalyticsResult.PARSER, jobId, logStream, processInStream, processOutStream, processRestoreStream, numberOfFields,
|
||||
super(NAME, AnalyticsResult.PARSER, jobId, processPipes, numberOfFields,
|
||||
filesToDelete, onProcessCrash, processConnectTimeout, namedXContentRegistry);
|
||||
this.config = Objects.requireNonNull(config);
|
||||
}
|
||||
|
@ -75,48 +75,48 @@ public class NativeAnalyticsProcessFactory implements AnalyticsProcessFactory<An
|
||||
String jobId = config.getId();
|
||||
List<Path> filesToDelete = new ArrayList<>();
|
||||
ProcessPipes processPipes = new ProcessPipes(env, NAMED_PIPE_HELPER, AnalyticsBuilder.ANALYTICS, jobId,
|
||||
true, false, true, true, state != null, config.getAnalysis().persistsState());
|
||||
false, true, true, state != null, config.getAnalysis().persistsState());
|
||||
|
||||
// The extra 2 are for the checksum and the control field
|
||||
int numberOfFields = analyticsProcessConfig.cols() + 2;
|
||||
|
||||
createNativeProcess(jobId, analyticsProcessConfig, filesToDelete, processPipes);
|
||||
createNativeProcess(jobId, analyticsProcessConfig, filesToDelete, processPipes, executorService);
|
||||
|
||||
NativeAnalyticsProcess analyticsProcess = new NativeAnalyticsProcess(jobId, processPipes.getLogStream().get(),
|
||||
processPipes.getProcessInStream().get(), processPipes.getProcessOutStream().get(),
|
||||
processPipes.getRestoreStream().orElse(null), numberOfFields, filesToDelete, onProcessCrash, processConnectTimeout,
|
||||
NativeAnalyticsProcess analyticsProcess = new NativeAnalyticsProcess(jobId, processPipes,
|
||||
numberOfFields, filesToDelete, onProcessCrash, processConnectTimeout,
|
||||
analyticsProcessConfig, namedXContentRegistry);
|
||||
|
||||
try {
|
||||
startProcess(config, executorService, processPipes, analyticsProcess);
|
||||
return analyticsProcess;
|
||||
} catch (EsRejectedExecutionException e) {
|
||||
} catch (IOException | EsRejectedExecutionException e) {
|
||||
String msg = "Failed to connect to data frame analytics process for job " + jobId;
|
||||
LOGGER.error(msg);
|
||||
try {
|
||||
IOUtils.close(analyticsProcess);
|
||||
} catch (IOException ioe) {
|
||||
LOGGER.error("Can't close data frame analytics process", ioe);
|
||||
}
|
||||
throw e;
|
||||
throw ExceptionsHelper.serverError(msg, e);
|
||||
}
|
||||
}
|
||||
|
||||
private void startProcess(DataFrameAnalyticsConfig config, ExecutorService executorService, ProcessPipes processPipes,
|
||||
NativeAnalyticsProcess process) {
|
||||
NativeAnalyticsProcess process) throws IOException {
|
||||
if (config.getAnalysis().persistsState()) {
|
||||
IndexingStateProcessor stateProcessor = new IndexingStateProcessor(config.getId(), resultsPersisterService, auditor);
|
||||
process.start(executorService, stateProcessor, processPipes.getPersistStream().get());
|
||||
process.start(executorService, stateProcessor);
|
||||
} else {
|
||||
process.start(executorService);
|
||||
}
|
||||
}
|
||||
|
||||
private void createNativeProcess(String jobId, AnalyticsProcessConfig analyticsProcessConfig, List<Path> filesToDelete,
|
||||
ProcessPipes processPipes) {
|
||||
ProcessPipes processPipes, ExecutorService executorService) {
|
||||
AnalyticsBuilder analyticsBuilder =
|
||||
new AnalyticsBuilder(env::tmpFile, nativeController, processPipes, analyticsProcessConfig, filesToDelete);
|
||||
try {
|
||||
analyticsBuilder.build();
|
||||
processPipes.connectStreams(processConnectTimeout);
|
||||
} catch (IOException e) {
|
||||
String msg = "Failed to launch data frame analytics process for job " + jobId;
|
||||
LOGGER.error(msg);
|
||||
|
@ -8,9 +8,8 @@ package org.elasticsearch.xpack.ml.dataframe.process;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.xpack.ml.dataframe.process.results.MemoryUsageEstimationResult;
|
||||
import org.elasticsearch.xpack.ml.process.ProcessPipes;
|
||||
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.file.Path;
|
||||
import java.time.Duration;
|
||||
import java.util.List;
|
||||
@ -20,11 +19,10 @@ public class NativeMemoryUsageEstimationProcess extends AbstractNativeAnalyticsP
|
||||
|
||||
private static final String NAME = "memory_usage_estimation";
|
||||
|
||||
protected NativeMemoryUsageEstimationProcess(String jobId, InputStream logStream,
|
||||
OutputStream processInStream, InputStream processOutStream,
|
||||
OutputStream processRestoreStream, int numberOfFields, List<Path> filesToDelete,
|
||||
protected NativeMemoryUsageEstimationProcess(String jobId, ProcessPipes processPipes,
|
||||
int numberOfFields, List<Path> filesToDelete,
|
||||
Consumer<String> onProcessCrash, Duration processConnectTimeout) {
|
||||
super(NAME, MemoryUsageEstimationResult.PARSER, jobId, logStream, processInStream, processOutStream, processRestoreStream,
|
||||
super(NAME, MemoryUsageEstimationResult.PARSER, jobId, processPipes,
|
||||
numberOfFields, filesToDelete, onProcessCrash, processConnectTimeout, NamedXContentRegistry.EMPTY);
|
||||
}
|
||||
|
||||
|
@ -62,17 +62,13 @@ public class NativeMemoryUsageEstimationProcessFactory implements AnalyticsProce
|
||||
Consumer<String> onProcessCrash) {
|
||||
List<Path> filesToDelete = new ArrayList<>();
|
||||
ProcessPipes processPipes = new ProcessPipes(
|
||||
env, NAMED_PIPE_HELPER, AnalyticsBuilder.ANALYTICS, config.getId(), true, false, false, true, false, false);
|
||||
env, NAMED_PIPE_HELPER, AnalyticsBuilder.ANALYTICS, config.getId(), false, false, true, false, false);
|
||||
|
||||
createNativeProcess(config.getId(), analyticsProcessConfig, filesToDelete, processPipes);
|
||||
|
||||
NativeMemoryUsageEstimationProcess process = new NativeMemoryUsageEstimationProcess(
|
||||
config.getId(),
|
||||
processPipes.getLogStream().get(),
|
||||
// Memory estimation process does not use the input pipe, hence null.
|
||||
null,
|
||||
processPipes.getProcessOutStream().get(),
|
||||
null,
|
||||
processPipes,
|
||||
0,
|
||||
filesToDelete,
|
||||
onProcessCrash,
|
||||
@ -81,13 +77,15 @@ public class NativeMemoryUsageEstimationProcessFactory implements AnalyticsProce
|
||||
try {
|
||||
process.start(executorService);
|
||||
return process;
|
||||
} catch (EsRejectedExecutionException e) {
|
||||
} catch (IOException | EsRejectedExecutionException e) {
|
||||
String msg = "Failed to connect to data frame analytics memory usage estimation process for job " + config.getId();
|
||||
LOGGER.error(msg);
|
||||
try {
|
||||
IOUtils.close(process);
|
||||
} catch (IOException ioe) {
|
||||
LOGGER.error("Can't close data frame analytics memory usage estimation process", ioe);
|
||||
}
|
||||
throw e;
|
||||
throw ExceptionsHelper.serverError(msg, e);
|
||||
}
|
||||
}
|
||||
|
||||
@ -98,7 +96,6 @@ public class NativeMemoryUsageEstimationProcessFactory implements AnalyticsProce
|
||||
.performMemoryUsageEstimationOnly();
|
||||
try {
|
||||
analyticsBuilder.build();
|
||||
processPipes.connectStreams(processConnectTimeout);
|
||||
} catch (IOException e) {
|
||||
String msg = "Failed to launch data frame analytics memory usage estimation process for job " + jobId;
|
||||
LOGGER.error(msg);
|
||||
|
@ -20,10 +20,10 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.params.ForecastParams;
|
||||
import org.elasticsearch.xpack.ml.job.process.autodetect.writer.AutodetectControlMsgWriter;
|
||||
import org.elasticsearch.xpack.ml.job.results.AutodetectResult;
|
||||
import org.elasticsearch.xpack.ml.process.AbstractNativeProcess;
|
||||
import org.elasticsearch.xpack.ml.process.ProcessPipes;
|
||||
import org.elasticsearch.xpack.ml.process.ProcessResultsParser;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.file.Path;
|
||||
import java.time.Duration;
|
||||
@ -42,12 +42,11 @@ class NativeAutodetectProcess extends AbstractNativeProcess implements Autodetec
|
||||
|
||||
private final ProcessResultsParser<AutodetectResult> resultsParser;
|
||||
|
||||
NativeAutodetectProcess(String jobId, InputStream logStream, OutputStream processInStream, InputStream processOutStream,
|
||||
OutputStream processRestoreStream, int numberOfFields, List<Path> filesToDelete,
|
||||
NativeAutodetectProcess(String jobId, ProcessPipes processPipes,
|
||||
int numberOfFields, List<Path> filesToDelete,
|
||||
ProcessResultsParser<AutodetectResult> resultsParser, Consumer<String> onProcessCrash,
|
||||
Duration processConnectTimeout) {
|
||||
super(jobId, logStream, processInStream, processOutStream, processRestoreStream, numberOfFields, filesToDelete, onProcessCrash,
|
||||
processConnectTimeout);
|
||||
super(jobId, processPipes, numberOfFields, filesToDelete, onProcessCrash, processConnectTimeout);
|
||||
this.resultsParser = resultsParser;
|
||||
}
|
||||
|
||||
|
@ -77,7 +77,7 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory
|
||||
Consumer<String> onProcessCrash) {
|
||||
List<Path> filesToDelete = new ArrayList<>();
|
||||
ProcessPipes processPipes = new ProcessPipes(env, NAMED_PIPE_HELPER, AutodetectBuilder.AUTODETECT, job.getId(),
|
||||
true, false, true, true, params.modelSnapshot() != null,
|
||||
false, true, true, params.modelSnapshot() != null,
|
||||
!AutodetectBuilder.DONT_PERSIST_MODEL_STATE_SETTING.get(settings));
|
||||
createNativeProcess(job, params, processPipes, filesToDelete);
|
||||
boolean includeTokensField = MachineLearning.CATEGORIZATION_TOKENIZATION_IN_JAVA
|
||||
@ -89,19 +89,20 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory
|
||||
ProcessResultsParser<AutodetectResult> resultsParser = new ProcessResultsParser<>(AutodetectResult.PARSER,
|
||||
NamedXContentRegistry.EMPTY);
|
||||
NativeAutodetectProcess autodetect = new NativeAutodetectProcess(
|
||||
job.getId(), processPipes.getLogStream().get(), processPipes.getProcessInStream().get(),
|
||||
processPipes.getProcessOutStream().get(), processPipes.getRestoreStream().orElse(null), numberOfFields,
|
||||
job.getId(), processPipes, numberOfFields,
|
||||
filesToDelete, resultsParser, onProcessCrash, processConnectTimeout);
|
||||
try {
|
||||
autodetect.start(executorService, stateProcessor, processPipes.getPersistStream().get());
|
||||
autodetect.start(executorService, stateProcessor);
|
||||
return autodetect;
|
||||
} catch (EsRejectedExecutionException e) {
|
||||
} catch (IOException | EsRejectedExecutionException e) {
|
||||
String msg = "Failed to connect to autodetect for job " + job.getId();
|
||||
LOGGER.error(msg);
|
||||
try {
|
||||
IOUtils.close(autodetect);
|
||||
} catch (IOException ioe) {
|
||||
LOGGER.error("Can't close autodetect", ioe);
|
||||
}
|
||||
throw e;
|
||||
throw ExceptionsHelper.serverError(msg, e);
|
||||
}
|
||||
}
|
||||
|
||||
@ -126,7 +127,6 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory
|
||||
autodetectBuilder.quantiles(autodetectParams.quantiles());
|
||||
}
|
||||
autodetectBuilder.build();
|
||||
processPipes.connectStreams(processConnectTimeout);
|
||||
} catch (IOException e) {
|
||||
String msg = "Failed to launch autodetect for job " + job.getId();
|
||||
LOGGER.error(msg);
|
||||
|
@ -7,9 +7,8 @@ package org.elasticsearch.xpack.ml.job.process.normalizer;
|
||||
|
||||
import org.elasticsearch.xpack.ml.job.process.normalizer.output.NormalizerResultHandler;
|
||||
import org.elasticsearch.xpack.ml.process.AbstractNativeProcess;
|
||||
import org.elasticsearch.xpack.ml.process.ProcessPipes;
|
||||
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.time.Duration;
|
||||
import java.util.Collections;
|
||||
|
||||
@ -20,9 +19,8 @@ class NativeNormalizerProcess extends AbstractNativeProcess implements Normalize
|
||||
|
||||
private static final String NAME = "normalizer";
|
||||
|
||||
NativeNormalizerProcess(String jobId, InputStream logStream, OutputStream processInStream, InputStream processOutStream,
|
||||
Duration processConnectTimeout) {
|
||||
super(jobId, logStream, processInStream, processOutStream, null, 0, Collections.emptyList(), (ignore) -> {}, processConnectTimeout);
|
||||
NativeNormalizerProcess(String jobId, ProcessPipes processPipes, Duration processConnectTimeout) {
|
||||
super(jobId, processPipes, 0, Collections.emptyList(), (ignore) -> {}, processConnectTimeout);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -55,22 +55,23 @@ public class NativeNormalizerProcessFactory implements NormalizerProcessFactory
|
||||
// in quick succession for the same job the job ID alone is not sufficient to guarantee that the normalizer process pipe names
|
||||
// are unique. Therefore an increasing counter value is appended to the job ID to ensure uniqueness between calls.
|
||||
ProcessPipes processPipes = new ProcessPipes(env, NAMED_PIPE_HELPER, NormalizerBuilder.NORMALIZE,
|
||||
jobId + "_" + counter.incrementAndGet(), true, false, true, true, false, false);
|
||||
jobId + "_" + counter.incrementAndGet(), false, true, true, false, false);
|
||||
createNativeProcess(jobId, quantilesState, processPipes, bucketSpan);
|
||||
|
||||
NativeNormalizerProcess normalizerProcess = new NativeNormalizerProcess(jobId, processPipes.getLogStream().get(),
|
||||
processPipes.getProcessInStream().get(), processPipes.getProcessOutStream().get(), processConnectTimeout);
|
||||
NativeNormalizerProcess normalizerProcess = new NativeNormalizerProcess(jobId, processPipes, processConnectTimeout);
|
||||
|
||||
try {
|
||||
normalizerProcess.start(executorService);
|
||||
return normalizerProcess;
|
||||
} catch (EsRejectedExecutionException e) {
|
||||
} catch (IOException | EsRejectedExecutionException e) {
|
||||
String msg = "Failed to connect to normalizer for job " + jobId;
|
||||
LOGGER.error(msg);
|
||||
try {
|
||||
IOUtils.close(normalizerProcess);
|
||||
} catch (IOException ioe) {
|
||||
LOGGER.error("Can't close normalizer", ioe);
|
||||
}
|
||||
throw e;
|
||||
throw ExceptionsHelper.serverError(msg, e);
|
||||
}
|
||||
}
|
||||
|
||||
@ -80,7 +81,6 @@ public class NativeNormalizerProcessFactory implements NormalizerProcessFactory
|
||||
List<String> command = new NormalizerBuilder(env, jobId, quantilesState, bucketSpan).build();
|
||||
processPipes.addArgs(command);
|
||||
nativeController.startProcess(command);
|
||||
processPipes.connectStreams(processConnectTimeout);
|
||||
} catch (IOException e) {
|
||||
String msg = "Failed to launch normalizer for job " + jobId;
|
||||
LOGGER.error(msg);
|
||||
|
@ -8,6 +8,7 @@ package org.elasticsearch.xpack.ml.process;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.apache.lucene.util.SetOnce;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.xpack.core.ml.MachineLearningField;
|
||||
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
|
||||
@ -43,14 +44,15 @@ public abstract class AbstractNativeProcess implements NativeProcess {
|
||||
private static final Duration WAIT_FOR_KILL_TIMEOUT = Duration.ofMillis(1000);
|
||||
|
||||
private final String jobId;
|
||||
private final CppLogMessageHandler cppLogHandler;
|
||||
private final OutputStream processInStream;
|
||||
private final ProcessPipes processPipes;
|
||||
private final SetOnce<CppLogMessageHandler> cppLogHandler = new SetOnce<>();
|
||||
// We need this as in Java 8 closing {@link FilterOutputStream} is not idempotent (i.e. cannot be performed twice).
|
||||
// For more details regarding the underlying issue see https://bugs.openjdk.java.net/browse/JDK-8054565
|
||||
private final AtomicBoolean processInStreamClosed = new AtomicBoolean();
|
||||
private final InputStream processOutStream;
|
||||
private final OutputStream processRestoreStream;
|
||||
private final LengthEncodedWriter recordWriter;
|
||||
private final SetOnce<OutputStream> processInStream = new SetOnce<>();
|
||||
private final SetOnce<InputStream> processOutStream = new SetOnce<>();
|
||||
private final SetOnce<OutputStream> processRestoreStream = new SetOnce<>();
|
||||
private final SetOnce<LengthEncodedWriter> recordWriter = new SetOnce<>();
|
||||
private final ZonedDateTime startTime;
|
||||
private final int numberOfFields;
|
||||
private final List<Path> filesToDelete;
|
||||
@ -62,15 +64,11 @@ public abstract class AbstractNativeProcess implements NativeProcess {
|
||||
private volatile boolean processKilled;
|
||||
private volatile boolean isReady;
|
||||
|
||||
protected AbstractNativeProcess(String jobId, InputStream logStream, OutputStream processInStream, InputStream processOutStream,
|
||||
OutputStream processRestoreStream, int numberOfFields, List<Path> filesToDelete,
|
||||
Consumer<String> onProcessCrash, Duration processConnectTimeout) {
|
||||
protected AbstractNativeProcess(String jobId, ProcessPipes processPipes,
|
||||
int numberOfFields, List<Path> filesToDelete, Consumer<String> onProcessCrash,
|
||||
Duration processConnectTimeout) {
|
||||
this.jobId = jobId;
|
||||
this.cppLogHandler = new CppLogMessageHandler(jobId, logStream);
|
||||
this.processInStream = processInStream != null ? new BufferedOutputStream(processInStream) : null;
|
||||
this.processOutStream = processOutStream;
|
||||
this.processRestoreStream = processRestoreStream;
|
||||
this.recordWriter = new LengthEncodedWriter(this.processInStream);
|
||||
this.processPipes = processPipes;
|
||||
this.startTime = ZonedDateTime.now();
|
||||
this.numberOfFields = numberOfFields;
|
||||
this.filesToDelete = filesToDelete;
|
||||
@ -81,12 +79,18 @@ public abstract class AbstractNativeProcess implements NativeProcess {
|
||||
public abstract String getName();
|
||||
|
||||
/**
|
||||
* Starts a process that does not persist any state
|
||||
* Connects the Java side of an ML process to the named pipes that connect it to the C++ side,
|
||||
* and starts tailing the C++ logs. Stores references to all the streams except the state
|
||||
* persistence stream.
|
||||
* @param executorService the executor service to run on
|
||||
*/
|
||||
public void start(ExecutorService executorService) {
|
||||
public void start(ExecutorService executorService) throws IOException {
|
||||
|
||||
processPipes.connectLogStream(processConnectTimeout);
|
||||
cppLogHandler.set(processPipes.getLogStreamHandler());
|
||||
|
||||
logTailFuture = executorService.submit(() -> {
|
||||
try (CppLogMessageHandler h = cppLogHandler) {
|
||||
try (CppLogMessageHandler h = cppLogHandler.get()) {
|
||||
h.tailStream();
|
||||
} catch (IOException e) {
|
||||
if (processKilled == false) {
|
||||
@ -96,6 +100,14 @@ public abstract class AbstractNativeProcess implements NativeProcess {
|
||||
detectCrash();
|
||||
}
|
||||
});
|
||||
|
||||
processPipes.connectOtherStreams(processConnectTimeout);
|
||||
if (processPipes.getProcessInStream().isPresent()) {
|
||||
processInStream.set(new BufferedOutputStream(processPipes.getProcessInStream().get()));
|
||||
this.recordWriter.set(new LengthEncodedWriter(processInStream.get()));
|
||||
}
|
||||
processOutStream.set(processPipes.getProcessOutStream().orElse(null));
|
||||
processRestoreStream.set(processPipes.getRestoreStream().orElse(null));
|
||||
}
|
||||
|
||||
/**
|
||||
@ -106,14 +118,14 @@ public abstract class AbstractNativeProcess implements NativeProcess {
|
||||
// Do not detect crash when the process is being closed or killed.
|
||||
return;
|
||||
}
|
||||
if (processInStream == null) {
|
||||
if (processInStream() == null) {
|
||||
// Do not detect crash when the process has been closed automatically.
|
||||
// This is possible when the process does not have input pipe to hang on and closes right after writing its output.
|
||||
return;
|
||||
}
|
||||
// The log message doesn't say "crashed", as the process could have been killed
|
||||
// by a user or other process (e.g. the Linux OOM killer)
|
||||
String errors = cppLogHandler.getErrors();
|
||||
String errors = cppLogHandler().getErrors();
|
||||
String fullError = String.format(Locale.ROOT, "[%s] %s process stopped unexpectedly: %s", jobId, getName(), errors);
|
||||
LOGGER.error(fullError);
|
||||
onProcessCrash.accept(fullError);
|
||||
@ -123,13 +135,13 @@ public abstract class AbstractNativeProcess implements NativeProcess {
|
||||
* Starts a process that may persist its state
|
||||
* @param executorService the executor service to run on
|
||||
* @param stateProcessor the state processor
|
||||
* @param persistStream the stream where the state is persisted
|
||||
*/
|
||||
public void start(ExecutorService executorService, StateProcessor stateProcessor, InputStream persistStream) {
|
||||
public void start(ExecutorService executorService, StateProcessor stateProcessor) throws IOException {
|
||||
start(executorService);
|
||||
|
||||
assert processPipes.getPersistStream().isPresent();
|
||||
stateProcessorFuture = executorService.submit(() -> {
|
||||
try (InputStream in = persistStream) {
|
||||
try (InputStream in = processPipes.getPersistStream().get()) {
|
||||
stateProcessor.process(in);
|
||||
if (processKilled == false) {
|
||||
LOGGER.info("[{}] State output finished", jobId);
|
||||
@ -153,12 +165,12 @@ public abstract class AbstractNativeProcess implements NativeProcess {
|
||||
|
||||
@Override
|
||||
public void writeRecord(String[] record) throws IOException {
|
||||
recordWriter.writeRecord(record);
|
||||
recordWriter().writeRecord(record);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flushStream() throws IOException {
|
||||
recordWriter.flush();
|
||||
recordWriter().flush();
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -166,10 +178,10 @@ public abstract class AbstractNativeProcess implements NativeProcess {
|
||||
try {
|
||||
processCloseInitiated = true;
|
||||
// closing its input causes the process to exit
|
||||
if (processInStream != null) {
|
||||
if (processInStream() != null) {
|
||||
// Make sure {@code processInStream.close()} is called at most once.
|
||||
if (processInStreamClosed.compareAndSet(false, true)) {
|
||||
processInStream.close();
|
||||
processInStream().close();
|
||||
}
|
||||
}
|
||||
// wait for the process to exit by waiting for end-of-file on the named pipe connected
|
||||
@ -185,8 +197,8 @@ public abstract class AbstractNativeProcess implements NativeProcess {
|
||||
logTailFuture.get(5, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
if (cppLogHandler.seenFatalError()) {
|
||||
throw ExceptionsHelper.serverError(cppLogHandler.getErrors());
|
||||
if (cppLogHandler().seenFatalError()) {
|
||||
throw ExceptionsHelper.serverError(cppLogHandler().getErrors());
|
||||
}
|
||||
LOGGER.debug("[{}] {} process exited", jobId, getName());
|
||||
} catch (ExecutionException | TimeoutException e) {
|
||||
@ -206,19 +218,19 @@ public abstract class AbstractNativeProcess implements NativeProcess {
|
||||
try {
|
||||
// The PID comes via the processes log stream. We do wait here to give the process the time to start up and report its PID.
|
||||
// Without the PID we cannot kill the process.
|
||||
NativeControllerHolder.getNativeController().killProcess(cppLogHandler.getPid(processConnectTimeout));
|
||||
NativeControllerHolder.getNativeController().killProcess(cppLogHandler().getPid(processConnectTimeout));
|
||||
|
||||
// Wait for the process to die before closing processInStream as if the process
|
||||
// is still alive when processInStream is closed it may start persisting state
|
||||
cppLogHandler.waitForLogStreamClose(WAIT_FOR_KILL_TIMEOUT);
|
||||
cppLogHandler().waitForLogStreamClose(WAIT_FOR_KILL_TIMEOUT);
|
||||
} catch (TimeoutException e) {
|
||||
LOGGER.warn("[{}] Failed to get PID of {} process to kill", jobId, getName());
|
||||
} finally {
|
||||
try {
|
||||
if (processInStream != null) {
|
||||
if (processInStream() != null) {
|
||||
// Make sure {@code processInStream.close()} is called at most once.
|
||||
if (processInStreamClosed.compareAndSet(false, true)) {
|
||||
processInStream.close();
|
||||
processInStream().close();
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
@ -256,18 +268,18 @@ public abstract class AbstractNativeProcess implements NativeProcess {
|
||||
@Override
|
||||
public boolean isProcessAlive() {
|
||||
// Sanity check: make sure the process hasn't terminated already
|
||||
return !cppLogHandler.hasLogStreamEnded();
|
||||
return cppLogHandler().hasLogStreamEnded() == false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isProcessAliveAfterWaiting() {
|
||||
cppLogHandler.waitForLogStreamClose(Duration.ofMillis(45));
|
||||
cppLogHandler().waitForLogStreamClose(Duration.ofMillis(45));
|
||||
return isProcessAlive();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String readError() {
|
||||
return cppLogHandler.getErrors();
|
||||
return cppLogHandler().getErrors();
|
||||
}
|
||||
|
||||
protected String jobId() {
|
||||
@ -275,12 +287,17 @@ public abstract class AbstractNativeProcess implements NativeProcess {
|
||||
}
|
||||
|
||||
protected InputStream processOutStream() {
|
||||
return processOutStream;
|
||||
return processOutStream.get();
|
||||
}
|
||||
|
||||
@Nullable
|
||||
private OutputStream processInStream() {
|
||||
return processInStream.get();
|
||||
}
|
||||
|
||||
@Nullable
|
||||
protected OutputStream processRestoreStream() {
|
||||
return processRestoreStream;
|
||||
return processRestoreStream.get();
|
||||
}
|
||||
|
||||
protected int numberOfFields() {
|
||||
@ -288,7 +305,11 @@ public abstract class AbstractNativeProcess implements NativeProcess {
|
||||
}
|
||||
|
||||
protected LengthEncodedWriter recordWriter() {
|
||||
return recordWriter;
|
||||
return recordWriter.get();
|
||||
}
|
||||
|
||||
protected CppLogMessageHandler cppLogHandler() {
|
||||
return cppLogHandler.get();
|
||||
}
|
||||
|
||||
protected boolean isProcessKilled() {
|
||||
|
@ -58,19 +58,20 @@ public class NativeController {
|
||||
|
||||
NativeController(String localNodeName, Environment env, NamedPipeHelper namedPipeHelper) throws IOException {
|
||||
ProcessPipes processPipes = new ProcessPipes(env, namedPipeHelper, CONTROLLER, null,
|
||||
true, true, false, false, false, false);
|
||||
processPipes.connectStreams(CONTROLLER_CONNECT_TIMEOUT);
|
||||
true, false, false, false, false);
|
||||
processPipes.connectLogStream(CONTROLLER_CONNECT_TIMEOUT);
|
||||
tailLogsInThread(processPipes.getLogStreamHandler());
|
||||
processPipes.connectOtherStreams(CONTROLLER_CONNECT_TIMEOUT);
|
||||
this.localNodeName = localNodeName;
|
||||
this.cppLogHandler = new CppLogMessageHandler(null, processPipes.getLogStream().get());
|
||||
this.cppLogHandler = processPipes.getLogStreamHandler();
|
||||
this.commandStream = new BufferedOutputStream(processPipes.getCommandStream().get());
|
||||
}
|
||||
|
||||
void tailLogsInThread() {
|
||||
static void tailLogsInThread(CppLogMessageHandler cppLogHandler) {
|
||||
final Thread logTailThread = new Thread(
|
||||
() -> {
|
||||
try {
|
||||
cppLogHandler.tailStream();
|
||||
cppLogHandler.close();
|
||||
try (CppLogMessageHandler h = cppLogHandler) {
|
||||
h.tailStream();
|
||||
} catch (IOException e) {
|
||||
LOGGER.error("Error tailing C++ controller logs", e);
|
||||
}
|
||||
|
@ -38,7 +38,6 @@ public class NativeControllerHolder {
|
||||
synchronized (lock) {
|
||||
if (nativeController == null) {
|
||||
nativeController = new NativeController(localNodeName, environment, new NamedPipeHelper());
|
||||
nativeController.tailLogsInThread();
|
||||
}
|
||||
}
|
||||
return nativeController;
|
||||
|
@ -8,6 +8,7 @@ package org.elasticsearch.xpack.ml.process;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.monitor.jvm.JvmInfo;
|
||||
import org.elasticsearch.xpack.ml.process.logging.CppLogMessageHandler;
|
||||
import org.elasticsearch.xpack.ml.utils.NamedPipeHelper;
|
||||
|
||||
import java.io.IOException;
|
||||
@ -37,6 +38,7 @@ public class ProcessPipes {
|
||||
public static final String PERSIST_IS_PIPE_ARG = "--persistIsPipe";
|
||||
|
||||
private final NamedPipeHelper namedPipeHelper;
|
||||
private final String jobId;
|
||||
|
||||
/**
|
||||
* <code>null</code> indicates a pipe won't be used
|
||||
@ -48,7 +50,7 @@ public class ProcessPipes {
|
||||
private final String restorePipeName;
|
||||
private final String persistPipeName;
|
||||
|
||||
private InputStream logStream;
|
||||
private CppLogMessageHandler logStreamHandler;
|
||||
private OutputStream commandStream;
|
||||
private OutputStream processInStream;
|
||||
private InputStream processOutStream;
|
||||
@ -65,9 +67,10 @@ public class ProcessPipes {
|
||||
* May be null or empty for processes not associated with a specific job.
|
||||
*/
|
||||
public ProcessPipes(Environment env, NamedPipeHelper namedPipeHelper, String processName, String jobId,
|
||||
boolean wantLogPipe, boolean wantCommandPipe, boolean wantProcessInPipe, boolean wantProcessOutPipe,
|
||||
boolean wantCommandPipe, boolean wantProcessInPipe, boolean wantProcessOutPipe,
|
||||
boolean wantRestorePipe, boolean wantPersistPipe) {
|
||||
this.namedPipeHelper = namedPipeHelper;
|
||||
this.jobId = jobId;
|
||||
|
||||
// The way the pipe names are formed MUST match what is done in the controller main()
|
||||
// function, as it does not get any command line arguments when started as a daemon. If
|
||||
@ -80,7 +83,7 @@ public class ProcessPipes {
|
||||
}
|
||||
String prefix = prefixBuilder.toString();
|
||||
String suffix = String.format(Locale.ROOT, "_%d", JvmInfo.jvmInfo().getPid());
|
||||
logPipeName = wantLogPipe ? String.format(Locale.ROOT, "%slog%s", prefix, suffix) : null;
|
||||
logPipeName = String.format(Locale.ROOT, "%slog%s", prefix, suffix);
|
||||
commandPipeName = wantCommandPipe ? String.format(Locale.ROOT, "%scommand%s", prefix, suffix) : null;
|
||||
processInPipeName = wantProcessInPipe ? String.format(Locale.ROOT, "%sinput%s", prefix, suffix) : null;
|
||||
processOutPipeName = wantProcessOutPipe ? String.format(Locale.ROOT, "%soutput%s", prefix, suffix) : null;
|
||||
@ -92,9 +95,7 @@ public class ProcessPipes {
|
||||
* Augments a list of command line arguments, for example that built up by the AutodetectBuilder class.
|
||||
*/
|
||||
public void addArgs(List<String> command) {
|
||||
if (logPipeName != null) {
|
||||
command.add(LOG_PIPE_ARG + logPipeName);
|
||||
}
|
||||
command.add(LOG_PIPE_ARG + logPipeName);
|
||||
if (commandPipeName != null) {
|
||||
command.add(COMMAND_PIPE_ARG + commandPipeName);
|
||||
}
|
||||
@ -118,19 +119,35 @@ public class ProcessPipes {
|
||||
}
|
||||
|
||||
/**
|
||||
* Connect the pipes created by the C++ process. This must be called after the corresponding C++ process has been started.
|
||||
* Connect the log pipe created by the C++ process. The must be connected before any other pipes <em>and a thread must be
|
||||
* started to read from it</em>so that there is no risk of messages logged in between creation of the other pipes on the C++
|
||||
* side from blocking due to filling up the named pipe's buffer, and hence deadlocking communications between that process
|
||||
* and this JVM.
|
||||
* @param timeout Needs to be long enough for the C++ process perform all startup tasks that precede creation of named pipes.
|
||||
* There should not be very many of these, so a short timeout should be fine. However, at least a couple of
|
||||
* seconds is recommended due to the vagaries of process scheduling and the way VMs can completely stall for
|
||||
* some hypervisor actions.
|
||||
*/
|
||||
public void connectStreams(Duration timeout) throws IOException {
|
||||
public void connectLogStream(Duration timeout) throws IOException {
|
||||
logStreamHandler = new CppLogMessageHandler(jobId, namedPipeHelper.openNamedPipeInputStream(logPipeName, timeout));
|
||||
}
|
||||
|
||||
/**
|
||||
* Connect the other pipes created by the C++ process after the logging pipe has been connected. This must be called after
|
||||
* the corresponding C++ process has been started, and after {@link #connectLogStream}.
|
||||
* @param timeout Needs to be long enough for the C++ process perform all startup tasks that precede creation of named pipes.
|
||||
* There should not be very many of these, so a short timeout should be fine. However, at least a couple of
|
||||
* seconds is recommended due to the vagaries of process scheduling and the way VMs can completely stall for
|
||||
* some hypervisor actions.
|
||||
*/
|
||||
public void connectOtherStreams(Duration timeout) throws IOException {
|
||||
assert logStreamHandler != null : "Must connect log stream before other streams";
|
||||
if (logStreamHandler == null) {
|
||||
throw new NullPointerException("Must connect log stream before other streams");
|
||||
}
|
||||
// The order here is important. It must match the order that the C++ process tries to connect to the pipes, otherwise
|
||||
// a timeout is guaranteed. Also change api::CIoManager in the C++ code if changing the order here.
|
||||
try {
|
||||
if (logPipeName != null) {
|
||||
logStream = namedPipeHelper.openNamedPipeInputStream(logPipeName, timeout);
|
||||
}
|
||||
if (commandPipeName != null) {
|
||||
commandStream = namedPipeHelper.openNamedPipeOutputStream(commandPipeName, timeout);
|
||||
}
|
||||
@ -157,8 +174,8 @@ public class ProcessPipes {
|
||||
}
|
||||
|
||||
private void closeUnusedStreams() throws IOException {
|
||||
if (logStream != null) {
|
||||
logStream.close();
|
||||
if (logStreamHandler != null) {
|
||||
logStreamHandler.close();
|
||||
}
|
||||
if (commandStream != null) {
|
||||
commandStream.close();
|
||||
@ -177,15 +194,11 @@ public class ProcessPipes {
|
||||
}
|
||||
}
|
||||
|
||||
public Optional<InputStream> getLogStream() {
|
||||
// Distinguish between pipe not wanted and pipe wanted but not successfully connected
|
||||
if (logPipeName == null) {
|
||||
return Optional.empty();
|
||||
}
|
||||
if (logStream == null) {
|
||||
public CppLogMessageHandler getLogStreamHandler() {
|
||||
if (logStreamHandler == null) {
|
||||
throw new IllegalStateException("process streams must be connected before use");
|
||||
}
|
||||
return Optional.of(logStream);
|
||||
return logStreamHandler;
|
||||
}
|
||||
|
||||
public Optional<OutputStream> getCommandStream() {
|
||||
|
@ -1,68 +0,0 @@
|
||||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.ml.job.process.autodetect;
|
||||
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.set.Sets;
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.env.TestEnvironment;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.ml.MachineLearning;
|
||||
import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams;
|
||||
import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
|
||||
import org.elasticsearch.xpack.ml.process.NativeController;
|
||||
import org.elasticsearch.xpack.ml.process.ProcessPipes;
|
||||
import org.elasticsearch.xpack.ml.utils.persistence.ResultsPersisterService;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.time.Duration;
|
||||
import java.util.Collections;
|
||||
|
||||
import static org.mockito.Matchers.eq;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class NativeAutodetectProcessFactoryTests extends ESTestCase {
|
||||
|
||||
public void testSetProcessConnectTimeout() throws IOException {
|
||||
|
||||
int timeoutSeconds = randomIntBetween(5, 100);
|
||||
|
||||
Settings settings = Settings.builder()
|
||||
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString())
|
||||
.build();
|
||||
Environment env = TestEnvironment.newEnvironment(settings);
|
||||
NativeController nativeController = mock(NativeController.class);
|
||||
ResultsPersisterService resultsPersisterService = mock(ResultsPersisterService.class);
|
||||
AnomalyDetectionAuditor anomalyDetectionAuditor = mock(AnomalyDetectionAuditor.class);
|
||||
ClusterSettings clusterSettings = new ClusterSettings(settings,
|
||||
Sets.newHashSet(MachineLearning.PROCESS_CONNECT_TIMEOUT, AutodetectBuilder.MAX_ANOMALY_RECORDS_SETTING_DYNAMIC));
|
||||
ClusterService clusterService = mock(ClusterService.class);
|
||||
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
|
||||
Job job = mock(Job.class);
|
||||
when(job.getId()).thenReturn("set_process_connect_test_job");
|
||||
AutodetectParams autodetectParams = mock(AutodetectParams.class);
|
||||
ProcessPipes processPipes = mock(ProcessPipes.class);
|
||||
|
||||
NativeAutodetectProcessFactory nativeAutodetectProcessFactory = new NativeAutodetectProcessFactory(
|
||||
env,
|
||||
settings,
|
||||
nativeController,
|
||||
clusterService,
|
||||
resultsPersisterService,
|
||||
anomalyDetectionAuditor);
|
||||
nativeAutodetectProcessFactory.setProcessConnectTimeout(TimeValue.timeValueSeconds(timeoutSeconds));
|
||||
nativeAutodetectProcessFactory.createNativeProcess(job, autodetectParams, processPipes, Collections.emptyList());
|
||||
|
||||
verify(processPipes, times(1)).connectStreams(eq(Duration.ofSeconds(timeoutSeconds)));
|
||||
}
|
||||
}
|
@ -14,7 +14,9 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams;
|
||||
import org.elasticsearch.xpack.ml.job.process.autodetect.params.TimeRange;
|
||||
import org.elasticsearch.xpack.ml.job.process.autodetect.writer.AutodetectControlMsgWriter;
|
||||
import org.elasticsearch.xpack.ml.job.results.AutodetectResult;
|
||||
import org.elasticsearch.xpack.ml.process.ProcessPipes;
|
||||
import org.elasticsearch.xpack.ml.process.ProcessResultsParser;
|
||||
import org.elasticsearch.xpack.ml.process.logging.CppLogMessageHandler;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
|
||||
@ -38,7 +40,10 @@ import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.eq;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class NativeAutodetectProcessTests extends ESTestCase {
|
||||
@ -46,25 +51,38 @@ public class NativeAutodetectProcessTests extends ESTestCase {
|
||||
private static final int NUMBER_FIELDS = 5;
|
||||
|
||||
private ExecutorService executorService;
|
||||
private CppLogMessageHandler cppLogHandler;
|
||||
private ByteArrayOutputStream inputStream;
|
||||
private InputStream outputStream;
|
||||
private OutputStream restoreStream;
|
||||
private InputStream persistStream;
|
||||
private ProcessPipes processPipes;
|
||||
|
||||
@Before
|
||||
@SuppressWarnings("unchecked")
|
||||
public void initialize() {
|
||||
executorService = mock(ExecutorService.class);
|
||||
when(executorService.submit(any(Runnable.class))).thenReturn(mock(Future.class));
|
||||
cppLogHandler = mock(CppLogMessageHandler.class);
|
||||
when(cppLogHandler.getErrors()).thenReturn("");
|
||||
inputStream = new ByteArrayOutputStream(AutodetectControlMsgWriter.FLUSH_SPACES_LENGTH + 1024);
|
||||
outputStream = new ByteArrayInputStream("some string of data".getBytes(StandardCharsets.UTF_8));
|
||||
restoreStream = mock(OutputStream.class);
|
||||
persistStream = mock(InputStream.class);
|
||||
processPipes = mock(ProcessPipes.class);
|
||||
when(processPipes.getLogStreamHandler()).thenReturn(cppLogHandler);
|
||||
when(processPipes.getProcessInStream()).thenReturn(Optional.of(inputStream));
|
||||
when(processPipes.getProcessOutStream()).thenReturn(Optional.of(outputStream));
|
||||
when(processPipes.getRestoreStream()).thenReturn(Optional.of(restoreStream));
|
||||
when(processPipes.getPersistStream()).thenReturn(Optional.of(persistStream));
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testProcessStartTime() throws Exception {
|
||||
InputStream logStream = mock(InputStream.class);
|
||||
when(logStream.read(new byte[1024])).thenReturn(-1);
|
||||
InputStream outputStream = mock(InputStream.class);
|
||||
when(outputStream.read(new byte[512])).thenReturn(-1);
|
||||
try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream,
|
||||
mock(OutputStream.class), outputStream, mock(OutputStream.class),
|
||||
NUMBER_FIELDS, null,
|
||||
try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo",
|
||||
processPipes, NUMBER_FIELDS, null,
|
||||
new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class), Duration.ZERO)) {
|
||||
process.start(executorService, mock(IndexingStateProcessor.class), mock(InputStream.class));
|
||||
process.start(executorService, mock(IndexingStateProcessor.class));
|
||||
|
||||
ZonedDateTime startTime = process.getProcessStartTime();
|
||||
Thread.sleep(500);
|
||||
@ -78,21 +96,16 @@ public class NativeAutodetectProcessTests extends ESTestCase {
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testWriteRecord() throws IOException {
|
||||
InputStream logStream = mock(InputStream.class);
|
||||
when(logStream.read(new byte[1024])).thenReturn(-1);
|
||||
InputStream outputStream = mock(InputStream.class);
|
||||
when(outputStream.read(new byte[512])).thenReturn(-1);
|
||||
String[] record = {"r1", "r2", "r3", "r4", "r5"};
|
||||
ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
|
||||
try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream,
|
||||
bos, outputStream, mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(),
|
||||
try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo",
|
||||
processPipes, NUMBER_FIELDS, Collections.emptyList(),
|
||||
new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class), Duration.ZERO)) {
|
||||
process.start(executorService, mock(IndexingStateProcessor.class), mock(InputStream.class));
|
||||
process.start(executorService, mock(IndexingStateProcessor.class));
|
||||
|
||||
process.writeRecord(record);
|
||||
process.flushStream();
|
||||
|
||||
ByteBuffer bb = ByteBuffer.wrap(bos.toByteArray());
|
||||
ByteBuffer bb = ByteBuffer.wrap(inputStream.toByteArray());
|
||||
|
||||
// read header
|
||||
int numFields = bb.getInt();
|
||||
@ -114,20 +127,15 @@ public class NativeAutodetectProcessTests extends ESTestCase {
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testFlush() throws IOException {
|
||||
InputStream logStream = mock(InputStream.class);
|
||||
when(logStream.read(new byte[1024])).thenReturn(-1);
|
||||
InputStream outputStream = mock(InputStream.class);
|
||||
when(outputStream.read(new byte[512])).thenReturn(-1);
|
||||
ByteArrayOutputStream bos = new ByteArrayOutputStream(AutodetectControlMsgWriter.FLUSH_SPACES_LENGTH + 1024);
|
||||
try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream,
|
||||
bos, outputStream, mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(),
|
||||
try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo",
|
||||
processPipes, NUMBER_FIELDS, Collections.emptyList(),
|
||||
new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class), Duration.ZERO)) {
|
||||
process.start(executorService, mock(IndexingStateProcessor.class), mock(InputStream.class));
|
||||
process.start(executorService, mock(IndexingStateProcessor.class));
|
||||
|
||||
FlushJobParams params = FlushJobParams.builder().build();
|
||||
process.flushJob(params);
|
||||
|
||||
ByteBuffer bb = ByteBuffer.wrap(bos.toByteArray());
|
||||
ByteBuffer bb = ByteBuffer.wrap(inputStream.toByteArray());
|
||||
assertThat(bb.remaining(), is(greaterThan(AutodetectControlMsgWriter.FLUSH_SPACES_LENGTH)));
|
||||
}
|
||||
}
|
||||
@ -142,44 +150,52 @@ public class NativeAutodetectProcessTests extends ESTestCase {
|
||||
}
|
||||
|
||||
public void testPersistJob() throws IOException {
|
||||
testWriteMessage(p -> p.persistState(), AutodetectControlMsgWriter.BACKGROUND_PERSIST_MESSAGE_CODE);
|
||||
testWriteMessage(NativeAutodetectProcess::persistState, AutodetectControlMsgWriter.BACKGROUND_PERSIST_MESSAGE_CODE);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testConsumeAndCloseOutputStream() throws IOException {
|
||||
InputStream logStream = mock(InputStream.class);
|
||||
when(logStream.read(new byte[1024])).thenReturn(-1);
|
||||
OutputStream processInStream = mock(OutputStream.class);
|
||||
String json = "some string of data";
|
||||
ByteArrayInputStream processOutStream = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8));
|
||||
|
||||
try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream,
|
||||
processInStream, processOutStream, mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(),
|
||||
new ProcessResultsParser<AutodetectResult>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class),
|
||||
try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo",
|
||||
processPipes, NUMBER_FIELDS, Collections.emptyList(),
|
||||
new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class),
|
||||
Duration.ZERO)) {
|
||||
|
||||
process.start(executorService);
|
||||
process.consumeAndCloseOutputStream();
|
||||
assertThat(processOutStream.available(), equalTo(0));
|
||||
assertThat(outputStream.available(), equalTo(0));
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testPipeConnectTimeout() throws IOException {
|
||||
|
||||
int timeoutSeconds = randomIntBetween(5, 100);
|
||||
|
||||
try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo",
|
||||
processPipes, NUMBER_FIELDS, Collections.emptyList(),
|
||||
new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class),
|
||||
Duration.ofSeconds(timeoutSeconds))) {
|
||||
|
||||
process.start(executorService);
|
||||
}
|
||||
|
||||
verify(processPipes, times(1)).connectLogStream(eq(Duration.ofSeconds(timeoutSeconds)));
|
||||
verify(processPipes, times(1)).connectOtherStreams(eq(Duration.ofSeconds(timeoutSeconds)));
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private void testWriteMessage(CheckedConsumer<NativeAutodetectProcess> writeFunction, String expectedMessageCode) throws IOException {
|
||||
InputStream logStream = mock(InputStream.class);
|
||||
when(logStream.read(new byte[1024])).thenReturn(-1);
|
||||
InputStream outputStream = mock(InputStream.class);
|
||||
when(outputStream.read(new byte[512])).thenReturn(-1);
|
||||
ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
|
||||
try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream,
|
||||
bos, outputStream, mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(),
|
||||
try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo",
|
||||
processPipes, NUMBER_FIELDS, Collections.emptyList(),
|
||||
new ProcessResultsParser<>(AutodetectResult.PARSER, NamedXContentRegistry.EMPTY), mock(Consumer.class), Duration.ZERO)) {
|
||||
process.start(executorService, mock(IndexingStateProcessor.class), mock(InputStream.class));
|
||||
process.start(executorService, mock(IndexingStateProcessor.class));
|
||||
|
||||
writeFunction.accept(process);
|
||||
process.writeUpdateModelPlotMessage(new ModelPlotConfig());
|
||||
process.flushStream();
|
||||
|
||||
String message = new String(bos.toByteArray(), StandardCharsets.UTF_8);
|
||||
String message = new String(inputStream.toByteArray(), StandardCharsets.UTF_8);
|
||||
assertTrue(message.contains(expectedMessageCode));
|
||||
}
|
||||
}
|
||||
|
@ -10,6 +10,7 @@ import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.xpack.ml.process.logging.CppLogMessageHandler;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
|
||||
@ -17,6 +18,7 @@ import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.time.Duration;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
@ -25,6 +27,7 @@ import java.util.function.Consumer;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyInt;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.verifyNoMoreInteractions;
|
||||
@ -32,10 +35,11 @@ import static org.mockito.Mockito.when;
|
||||
|
||||
public class AbstractNativeProcessTests extends ESTestCase {
|
||||
|
||||
private InputStream logStream;
|
||||
private CppLogMessageHandler cppLogHandler;
|
||||
private OutputStream inputStream;
|
||||
private InputStream outputStream;
|
||||
private OutputStream restoreStream;
|
||||
private ProcessPipes processPipes;
|
||||
private Consumer<String> onProcessCrash;
|
||||
private ExecutorService executorService;
|
||||
private CountDownLatch wait = new CountDownLatch(1);
|
||||
@ -43,18 +47,24 @@ public class AbstractNativeProcessTests extends ESTestCase {
|
||||
@Before
|
||||
@SuppressWarnings("unchecked")
|
||||
public void initialize() throws IOException {
|
||||
logStream = mock(InputStream.class);
|
||||
cppLogHandler = mock(CppLogMessageHandler.class);
|
||||
// This answer blocks the thread on the executor service.
|
||||
// In order to unblock it, the test needs to call wait.countDown().
|
||||
when(logStream.read(new byte[1024])).thenAnswer(
|
||||
doAnswer(
|
||||
invocationOnMock -> {
|
||||
wait.await();
|
||||
return -1;
|
||||
});
|
||||
return null;
|
||||
}).when(cppLogHandler).tailStream();
|
||||
when(cppLogHandler.getErrors()).thenReturn("");
|
||||
inputStream = mock(OutputStream.class);
|
||||
outputStream = mock(InputStream.class);
|
||||
when(outputStream.read(new byte[512])).thenReturn(-1);
|
||||
restoreStream = mock(OutputStream.class);
|
||||
restoreStream = mock(OutputStream.class);
|
||||
processPipes = mock(ProcessPipes.class);
|
||||
when(processPipes.getLogStreamHandler()).thenReturn(cppLogHandler);
|
||||
when(processPipes.getProcessInStream()).thenReturn(Optional.of(inputStream));
|
||||
when(processPipes.getProcessOutStream()).thenReturn(Optional.of(outputStream));
|
||||
when(processPipes.getRestoreStream()).thenReturn(Optional.of(restoreStream));
|
||||
onProcessCrash = mock(Consumer.class);
|
||||
executorService = EsExecutors.newFixed("test", 1, 1, EsExecutors.daemonThreadFactory("test"), new ThreadContext(Settings.EMPTY));
|
||||
}
|
||||
@ -66,30 +76,22 @@ public class AbstractNativeProcessTests extends ESTestCase {
|
||||
}
|
||||
|
||||
public void testStart_DoNotDetectCrashWhenNoInputPipeProvided() throws Exception {
|
||||
try (AbstractNativeProcess process = new TestNativeProcess(null)) {
|
||||
when(processPipes.getProcessInStream()).thenReturn(Optional.empty());
|
||||
try (AbstractNativeProcess process = new TestNativeProcess()) {
|
||||
process.start(executorService);
|
||||
wait.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
public void testStart_DoNotDetectCrashWhenProcessIsBeingClosed() throws Exception {
|
||||
try (AbstractNativeProcess process = new TestNativeProcess(inputStream)) {
|
||||
try (AbstractNativeProcess process = new TestNativeProcess()) {
|
||||
process.start(executorService);
|
||||
process.close();
|
||||
wait.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
public void testStart_DoNotDetectCrashWhenProcessIsBeingKilled() throws Exception {
|
||||
try (AbstractNativeProcess process = new TestNativeProcess(inputStream)) {
|
||||
process.start(executorService);
|
||||
process.kill();
|
||||
wait.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
public void testStart_DetectCrashWhenInputPipeExists() throws Exception {
|
||||
try (AbstractNativeProcess process = new TestNativeProcess(inputStream)) {
|
||||
try (AbstractNativeProcess process = new TestNativeProcess()) {
|
||||
process.start(executorService);
|
||||
wait.countDown();
|
||||
ThreadPool.terminate(executorService, 10, TimeUnit.SECONDS);
|
||||
@ -99,39 +101,54 @@ public class AbstractNativeProcessTests extends ESTestCase {
|
||||
}
|
||||
|
||||
public void testWriteRecord() throws Exception {
|
||||
try (AbstractNativeProcess process = new TestNativeProcess(inputStream)) {
|
||||
try (AbstractNativeProcess process = new TestNativeProcess()) {
|
||||
process.start(executorService);
|
||||
process.writeRecord(new String[] {"a", "b", "c"});
|
||||
process.flushStream();
|
||||
|
||||
verify(inputStream).write(any(), anyInt(), anyInt());
|
||||
|
||||
wait.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
public void testWriteRecord_FailWhenNoInputPipeProvided() throws Exception {
|
||||
try (AbstractNativeProcess process = new TestNativeProcess(null)) {
|
||||
when(processPipes.getProcessInStream()).thenReturn(Optional.empty());
|
||||
try (AbstractNativeProcess process = new TestNativeProcess()) {
|
||||
process.start(executorService);
|
||||
expectThrows(NullPointerException.class, () -> process.writeRecord(new String[] {"a", "b", "c"}));
|
||||
wait.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
public void testFlush() throws Exception {
|
||||
try (AbstractNativeProcess process = new TestNativeProcess(inputStream)) {
|
||||
try (AbstractNativeProcess process = new TestNativeProcess()) {
|
||||
process.start(executorService);
|
||||
process.flushStream();
|
||||
|
||||
verify(inputStream).flush();
|
||||
|
||||
wait.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
public void testFlush_FailWhenNoInputPipeProvided() throws Exception {
|
||||
try (AbstractNativeProcess process = new TestNativeProcess(null)) {
|
||||
expectThrows(NullPointerException.class, () -> process.flushStream());
|
||||
when(processPipes.getProcessInStream()).thenReturn(Optional.empty());
|
||||
try (AbstractNativeProcess process = new TestNativeProcess()) {
|
||||
process.start(executorService);
|
||||
expectThrows(NullPointerException.class, process::flushStream);
|
||||
wait.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
public void testIsReady() throws Exception {
|
||||
try (AbstractNativeProcess process = new TestNativeProcess(null)) {
|
||||
try (AbstractNativeProcess process = new TestNativeProcess()) {
|
||||
process.start(executorService);
|
||||
assertThat(process.isReady(), is(false));
|
||||
process.setReady();
|
||||
assertThat(process.isReady(), is(true));
|
||||
|
||||
wait.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
@ -140,8 +157,8 @@ public class AbstractNativeProcessTests extends ESTestCase {
|
||||
*/
|
||||
private class TestNativeProcess extends AbstractNativeProcess {
|
||||
|
||||
TestNativeProcess(OutputStream inputStream) {
|
||||
super("foo", logStream, inputStream, outputStream, restoreStream, 0, null, onProcessCrash, Duration.ZERO);
|
||||
TestNativeProcess() {
|
||||
super("foo", processPipes, 0, null, onProcessCrash, Duration.ZERO);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -150,7 +167,7 @@ public class AbstractNativeProcessTests extends ESTestCase {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void persistState() throws IOException {
|
||||
public void persistState() {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -11,21 +11,24 @@ import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.env.TestEnvironment;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.xpack.ml.utils.NamedPipeHelper;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.contains;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class NativeControllerTests extends ESTestCase {
|
||||
@ -40,8 +43,14 @@ public class NativeControllerTests extends ESTestCase {
|
||||
|
||||
public void testStartProcessCommand() throws IOException {
|
||||
|
||||
NamedPipeHelper namedPipeHelper = Mockito.mock(NamedPipeHelper.class);
|
||||
ByteArrayInputStream logStream = new ByteArrayInputStream(new byte[1]);
|
||||
NamedPipeHelper namedPipeHelper = mock(NamedPipeHelper.class);
|
||||
InputStream logStream = mock(InputStream.class);
|
||||
CountDownLatch wait = new CountDownLatch(1);
|
||||
doAnswer(
|
||||
invocationOnMock -> {
|
||||
wait.await();
|
||||
return -1;
|
||||
}).when(logStream).read(any());
|
||||
when(namedPipeHelper.openNamedPipeInputStream(contains("log"), any(Duration.class))).thenReturn(logStream);
|
||||
ByteArrayOutputStream commandStream = new ByteArrayOutputStream();
|
||||
when(namedPipeHelper.openNamedPipeOutputStream(contains("command"), any(Duration.class))).thenReturn(commandStream);
|
||||
@ -57,18 +66,19 @@ public class NativeControllerTests extends ESTestCase {
|
||||
|
||||
assertEquals("start\tmy_process\t--arg1\t--arg2=42\t--arg3=something with spaces\n",
|
||||
commandStream.toString(StandardCharsets.UTF_8.name()));
|
||||
|
||||
wait.countDown();
|
||||
}
|
||||
|
||||
public void testGetNativeCodeInfo() throws IOException, TimeoutException {
|
||||
|
||||
NamedPipeHelper namedPipeHelper = Mockito.mock(NamedPipeHelper.class);
|
||||
NamedPipeHelper namedPipeHelper = mock(NamedPipeHelper.class);
|
||||
ByteArrayInputStream logStream = new ByteArrayInputStream(TEST_MESSAGE.getBytes(StandardCharsets.UTF_8));
|
||||
when(namedPipeHelper.openNamedPipeInputStream(contains("log"), any(Duration.class))).thenReturn(logStream);
|
||||
ByteArrayOutputStream commandStream = new ByteArrayOutputStream();
|
||||
when(namedPipeHelper.openNamedPipeOutputStream(contains("command"), any(Duration.class))).thenReturn(commandStream);
|
||||
|
||||
NativeController nativeController = new NativeController(NODE_NAME, TestEnvironment.newEnvironment(settings), namedPipeHelper);
|
||||
nativeController.tailLogsInThread();
|
||||
Map<String, Object> nativeCodeInfo = nativeController.getNativeCodeInfo();
|
||||
|
||||
assertNotNull(nativeCodeInfo);
|
||||
@ -79,14 +89,13 @@ public class NativeControllerTests extends ESTestCase {
|
||||
|
||||
public void testControllerDeath() throws Exception {
|
||||
|
||||
NamedPipeHelper namedPipeHelper = Mockito.mock(NamedPipeHelper.class);
|
||||
NamedPipeHelper namedPipeHelper = mock(NamedPipeHelper.class);
|
||||
ByteArrayInputStream logStream = new ByteArrayInputStream(TEST_MESSAGE.getBytes(StandardCharsets.UTF_8));
|
||||
when(namedPipeHelper.openNamedPipeInputStream(contains("log"), any(Duration.class))).thenReturn(logStream);
|
||||
ByteArrayOutputStream commandStream = new ByteArrayOutputStream();
|
||||
when(namedPipeHelper.openNamedPipeOutputStream(contains("command"), any(Duration.class))).thenReturn(commandStream);
|
||||
|
||||
NativeController nativeController = new NativeController(NODE_NAME, TestEnvironment.newEnvironment(settings), namedPipeHelper);
|
||||
nativeController.tailLogsInThread();
|
||||
|
||||
// As soon as the log stream ends startProcess should think the native controller has died
|
||||
assertBusy(() -> {
|
||||
|
@ -10,6 +10,7 @@ import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.env.TestEnvironment;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectBuilder;
|
||||
import org.elasticsearch.xpack.ml.process.logging.CppLogMessageHandler;
|
||||
import org.elasticsearch.xpack.ml.utils.NamedPipeHelper;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
@ -17,6 +18,7 @@ import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
@ -30,11 +32,14 @@ import static org.mockito.Mockito.when;
|
||||
|
||||
public class ProcessPipesTests extends ESTestCase {
|
||||
|
||||
private static final byte[] LOG_BYTES = { 1 };
|
||||
private static final byte[] LOG_BYTES = ("\n"
|
||||
+ "{\"logger\":\"controller\",\"timestamp\":1478261151447,\"level\":\"INFO\""
|
||||
+ ",\"pid\":42,\"thread\":\"0x7fff7d2a8000\",\"message\":\"message 5\",\"class\":\"ml\","
|
||||
+ "\"method\":\"core::Something\",\"file\":\"Something.cc\",\"line\":555}\n").getBytes(StandardCharsets.UTF_8);
|
||||
private static final byte[] OUTPUT_BYTES = { 3 };
|
||||
private static final byte[] PERSIST_BYTES = { 6 };
|
||||
|
||||
public void testProcessPipes() throws IOException {
|
||||
public void testProcessPipes() throws Exception {
|
||||
Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build();
|
||||
Environment env = TestEnvironment.newEnvironment(settings);
|
||||
|
||||
@ -56,7 +61,7 @@ public class ProcessPipesTests extends ESTestCase {
|
||||
.thenReturn(new ByteArrayInputStream(PERSIST_BYTES));
|
||||
|
||||
ProcessPipes processPipes = new ProcessPipes(env, namedPipeHelper, AutodetectBuilder.AUTODETECT, "my_job",
|
||||
true, false, true, true, true, true);
|
||||
false, true, true, true, true);
|
||||
|
||||
List<String> command = new ArrayList<>();
|
||||
processPipes.addArgs(command);
|
||||
@ -71,16 +76,21 @@ public class ProcessPipesTests extends ESTestCase {
|
||||
assertEquals(ProcessPipes.PERSIST_ARG, command.get(7).substring(0, ProcessPipes.PERSIST_ARG.length()));
|
||||
assertEquals(ProcessPipes.PERSIST_IS_PIPE_ARG, command.get(8));
|
||||
|
||||
processPipes.connectStreams(Duration.ofSeconds(2));
|
||||
processPipes.connectLogStream(Duration.ofSeconds(2));
|
||||
|
||||
CppLogMessageHandler logMessageHandler = processPipes.getLogStreamHandler();
|
||||
assertNotNull(logMessageHandler);
|
||||
logMessageHandler.tailStream();
|
||||
assertEquals(42, logMessageHandler.getPid(Duration.ZERO));
|
||||
|
||||
processPipes.connectOtherStreams(Duration.ofSeconds(2));
|
||||
|
||||
assertTrue(processPipes.getLogStream().isPresent());
|
||||
assertFalse(processPipes.getCommandStream().isPresent());
|
||||
assertTrue(processPipes.getProcessInStream().isPresent());
|
||||
assertTrue(processPipes.getProcessOutStream().isPresent());
|
||||
assertTrue(processPipes.getRestoreStream().isPresent());
|
||||
assertTrue(processPipes.getPersistStream().isPresent());
|
||||
|
||||
assertEquals(1, processPipes.getLogStream().get().read());
|
||||
processPipes.getProcessInStream().get().write(2);
|
||||
byte[] processIn = processInStream.toByteArray();
|
||||
assertEquals(1, processIn.length);
|
||||
@ -93,13 +103,13 @@ public class ProcessPipesTests extends ESTestCase {
|
||||
assertEquals(6, processPipes.getPersistStream().get().read());
|
||||
}
|
||||
|
||||
public void testCloseUnusedPipes_notConnected() throws IOException {
|
||||
public void testCloseUnusedPipes_notConnected() {
|
||||
NamedPipeHelper namedPipeHelper = mock(NamedPipeHelper.class);
|
||||
Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build();
|
||||
Environment env = TestEnvironment.newEnvironment(settings);
|
||||
|
||||
ProcessPipes processPipes = new ProcessPipes(env, namedPipeHelper, AutodetectBuilder.AUTODETECT, "my_job",
|
||||
true, true, true, true, true, true);
|
||||
new ProcessPipes(env, namedPipeHelper, AutodetectBuilder.AUTODETECT, "my_job",
|
||||
true, true, true, true, true);
|
||||
}
|
||||
|
||||
public void testCloseOpenedPipesOnError() throws IOException {
|
||||
@ -126,9 +136,10 @@ public class ProcessPipesTests extends ESTestCase {
|
||||
Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build();
|
||||
Environment env = TestEnvironment.newEnvironment(settings);
|
||||
ProcessPipes processPipes = new ProcessPipes(env, namedPipeHelper, AutodetectBuilder.AUTODETECT, "my_job",
|
||||
true, true, true, true, true, true);
|
||||
true, true, true, true, true);
|
||||
|
||||
expectThrows(IOException.class, () -> processPipes.connectStreams(Duration.ofSeconds(2)));
|
||||
processPipes.connectLogStream(Duration.ofSeconds(2));
|
||||
expectThrows(IOException.class, () -> processPipes.connectOtherStreams(Duration.ofSeconds(2)));
|
||||
|
||||
// check the pipes successfully opened were then closed
|
||||
verify(logStream, times(1)).close();
|
||||
|
Loading…
x
Reference in New Issue
Block a user