* Added dedicated TP for scheduler and interacting with autodetect process. This capped at the number of threads required to run autodetect process times maximum number of jobs allowed to run on a node.
* Added a setting that determines the maximum number of jobs that can run on a single node. * Fail to start autodetect process if a user attempts to start more jobs than is allowed on a single node. * Prevent concurrent data write, flush and close operation to the autodetect process. Original commit: elastic/x-pack-elasticsearch@aca15fd51c
This commit is contained in:
parent
17b3224e03
commit
118abf963b
|
@ -16,7 +16,6 @@ import org.elasticsearch.common.ParseFieldMatcherSupplier;
|
|||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Setting.Property;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.plugins.ActionPlugin;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
|
@ -107,6 +106,8 @@ public class PrelertPlugin extends Plugin implements ActionPlugin {
|
|||
public static final String NAME = "prelert";
|
||||
public static final String BASE_PATH = "/_xpack/prelert/";
|
||||
public static final String THREAD_POOL_NAME = NAME;
|
||||
public static final String SCHEDULER_THREAD_POOL_NAME = NAME + "_scheduler";
|
||||
public static final String AUTODETECT_PROCESS_THREAD_POOL_NAME = NAME + "_autodetect_process";
|
||||
|
||||
// NORELEASE - temporary solution
|
||||
static final Setting<Boolean> USE_NATIVE_PROCESS_OPTION = Setting.boolSetting("useNativeProcess", false, Property.NodeScope,
|
||||
|
@ -164,7 +165,7 @@ public class PrelertPlugin extends Plugin implements ActionPlugin {
|
|||
throw new ElasticsearchException("Failed to create native process factory", e);
|
||||
}
|
||||
} else {
|
||||
processFactory = (JobDetails, ignoreDowntime) -> new BlackHoleAutodetectProcess();
|
||||
processFactory = (JobDetails, ignoreDowntime, executorService) -> new BlackHoleAutodetectProcess();
|
||||
}
|
||||
AutodetectResultsParser autodetectResultsParser = new AutodetectResultsParser(settings, parseFieldMatcherSupplier);
|
||||
DataProcessor dataProcessor = new AutodetectProcessManager(settings, client, env, threadPool,
|
||||
|
@ -251,8 +252,19 @@ public class PrelertPlugin extends Plugin implements ActionPlugin {
|
|||
|
||||
@Override
|
||||
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
|
||||
final FixedExecutorBuilder builder = new FixedExecutorBuilder(settings, THREAD_POOL_NAME,
|
||||
5 * EsExecutors.boundedNumberOfProcessors(settings), 1000, "xpack.prelert.thread_pool");
|
||||
return Collections.singletonList(builder);
|
||||
int maxNumberOfJobs = AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE.get(settings);
|
||||
FixedExecutorBuilder prelert = new FixedExecutorBuilder(settings, THREAD_POOL_NAME,
|
||||
maxNumberOfJobs, 1000, "xpack.prelert.thread_pool");
|
||||
|
||||
// fail quick to start autodetect process / scheduler, so no queues
|
||||
// 4 threads: for c++ logging, result processing, state processing and restore state
|
||||
FixedExecutorBuilder autoDetect = new FixedExecutorBuilder(settings, AUTODETECT_PROCESS_THREAD_POOL_NAME,
|
||||
maxNumberOfJobs * 4, 4, "xpack.prelert.autodetect_process_thread_pool");
|
||||
|
||||
// TODO: if scheduled and non scheduled jobs are considered more equal and the scheduler and
|
||||
// autodetect process are created at the same time then these two different TPs can merge.
|
||||
FixedExecutorBuilder scheduler = new FixedExecutorBuilder(settings, SCHEDULER_THREAD_POOL_NAME,
|
||||
maxNumberOfJobs, 1, "xpack.prelert.scheduler_thread_pool");
|
||||
return Arrays.asList(prelert, autoDetect, scheduler);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -242,11 +242,8 @@ public class PostDataAction extends Action<PostDataAction.Request, PostDataActio
|
|||
|
||||
@Override
|
||||
protected final void doExecute(Request request, ActionListener<Response> listener) {
|
||||
|
||||
TimeRange timeRange = TimeRange.builder().startTime(request.getResetStart()).endTime(request.getResetEnd()).build();
|
||||
DataLoadParams params = new DataLoadParams(timeRange, request.isIgnoreDowntime());
|
||||
|
||||
// NORELEASE Make this all async so we don't need to pass off to another thread pool and block
|
||||
threadPool.executor(PrelertPlugin.THREAD_POOL_NAME).execute(() -> {
|
||||
try {
|
||||
DataCounts dataCounts = processManager.processData(request.getJobId(), request.content.streamInput(), params);
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
|
|||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.xpack.prelert.PrelertPlugin;
|
||||
import org.elasticsearch.xpack.prelert.job.Job;
|
||||
import org.elasticsearch.xpack.prelert.job.manager.AutodetectProcessManager;
|
||||
import org.elasticsearch.xpack.prelert.job.process.autodetect.params.InterimResultsParams;
|
||||
|
@ -238,16 +239,20 @@ PostDataFlushAction.RequestBuilder> {
|
|||
|
||||
@Override
|
||||
protected final void doExecute(PostDataFlushAction.Request request, ActionListener<PostDataFlushAction.Response> listener) {
|
||||
|
||||
TimeRange timeRange = TimeRange.builder().startTime(request.getStart()).endTime(request.getEnd()).build();
|
||||
InterimResultsParams params = InterimResultsParams.builder()
|
||||
.calcInterim(request.getCalcInterim())
|
||||
.forTimeRange(timeRange)
|
||||
.advanceTime(request.getAdvanceTime())
|
||||
.build();
|
||||
|
||||
processManager.flushJob(request.getJobId(), params);
|
||||
listener.onResponse(new Response(true));
|
||||
threadPool.executor(PrelertPlugin.THREAD_POOL_NAME).execute(() -> {
|
||||
try {
|
||||
TimeRange timeRange = TimeRange.builder().startTime(request.getStart()).endTime(request.getEnd()).build();
|
||||
InterimResultsParams params = InterimResultsParams.builder()
|
||||
.calcInterim(request.getCalcInterim())
|
||||
.forTimeRange(timeRange)
|
||||
.advanceTime(request.getAdvanceTime())
|
||||
.build();
|
||||
processManager.flushJob(request.getJobId(), params);
|
||||
listener.onResponse(new Response(true));
|
||||
} catch (Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,12 +6,17 @@
|
|||
package org.elasticsearch.xpack.prelert.job.manager;
|
||||
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.elasticsearch.ElasticsearchStatusException;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.xpack.prelert.PrelertPlugin;
|
||||
import org.elasticsearch.xpack.prelert.job.DataCounts;
|
||||
import org.elasticsearch.xpack.prelert.job.Job;
|
||||
import org.elasticsearch.xpack.prelert.job.JobStatus;
|
||||
|
@ -45,12 +50,18 @@ import java.util.Locale;
|
|||
import java.util.Optional;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
public class AutodetectProcessManager extends AbstractComponent implements DataProcessor {
|
||||
|
||||
// TODO (norelease) to be reconsidered
|
||||
public static final Setting<Integer> MAX_RUNNING_JOBS_PER_NODE =
|
||||
Setting.intSetting("max_running_jobs", 10, Setting.Property.NodeScope);
|
||||
|
||||
private final Client client;
|
||||
private final Environment env;
|
||||
private final int maxRunningJobs;
|
||||
private final ThreadPool threadPool;
|
||||
private final JobManager jobManager;
|
||||
private final JobProvider jobProvider;
|
||||
|
@ -66,6 +77,7 @@ public class AutodetectProcessManager extends AbstractComponent implements DataP
|
|||
this.client = client;
|
||||
this.env = env;
|
||||
this.threadPool = threadPool;
|
||||
this.maxRunningJobs = MAX_RUNNING_JOBS_PER_NODE.get(settings);
|
||||
this.parser = parser;
|
||||
this.autodetectProcessFactory = autodetectProcessFactory;
|
||||
this.jobManager = jobManager;
|
||||
|
@ -80,17 +92,11 @@ public class AutodetectProcessManager extends AbstractComponent implements DataP
|
|||
return new DataCounts(jobId);
|
||||
}
|
||||
|
||||
AutodetectCommunicator communicator = autoDetectCommunicatorByJob.get(jobId);
|
||||
if (communicator == null) {
|
||||
communicator = create(jobId, params.isIgnoreDowntime());
|
||||
autoDetectCommunicatorByJob.put(jobId, communicator);
|
||||
}
|
||||
|
||||
AutodetectCommunicator communicator = autoDetectCommunicatorByJob.computeIfAbsent(jobId, id -> {
|
||||
return create(id, params.isIgnoreDowntime());
|
||||
});
|
||||
try {
|
||||
if (params.isResettingBuckets()) {
|
||||
communicator.writeResetBucketsControlMessage(params);
|
||||
}
|
||||
return communicator.writeToJob(input);
|
||||
return communicator.writeToJob(input, params);
|
||||
// TODO check for errors from autodetect
|
||||
} catch (IOException e) {
|
||||
String msg = String.format(Locale.ROOT, "Exception writing to process for job %s", jobId);
|
||||
|
@ -102,24 +108,41 @@ public class AutodetectProcessManager extends AbstractComponent implements DataP
|
|||
}
|
||||
}
|
||||
|
||||
// TODO (norelease) : here we must validate whether we have enough threads in TP in order start analytical process
|
||||
// Otherwise we are not able to communicate via all the named pipes and we can run into deadlock
|
||||
AutodetectCommunicator create(String jobId, boolean ignoreDowntime) {
|
||||
if (autoDetectCommunicatorByJob.size() == maxRunningJobs) {
|
||||
throw new ElasticsearchStatusException("max running job capacity [" + maxRunningJobs + "] reached",
|
||||
RestStatus.TOO_MANY_REQUESTS);
|
||||
}
|
||||
|
||||
// TODO norelease, once we remove black hole process and all persisters are singletons then we can
|
||||
// remove this method and move not enough threads logic to the auto detect process factory
|
||||
Job job = jobManager.getJobOrThrowIfUnknown(jobId);
|
||||
Logger jobLogger = Loggers.getLogger(job.getJobId());
|
||||
// A TP with no queue, so that we fail immediately if there are no threads available
|
||||
ExecutorService executorService = threadPool.executor(PrelertPlugin.AUTODETECT_PROCESS_THREAD_POOL_NAME);
|
||||
|
||||
ElasticsearchUsagePersister usagePersister = new ElasticsearchUsagePersister(client, jobLogger);
|
||||
UsageReporter usageReporter = new UsageReporter(settings, job.getJobId(), usagePersister, jobLogger);
|
||||
|
||||
JobDataCountsPersister jobDataCountsPersister = new ElasticsearchJobDataCountsPersister(client);
|
||||
JobResultsPersister persister = new ElasticsearchPersister(jobId, client);
|
||||
StatusReporter statusReporter = new StatusReporter(env, settings, job.getJobId(), jobProvider.dataCounts(jobId),
|
||||
usageReporter, jobDataCountsPersister, jobLogger, job.getAnalysisConfig().getBucketSpanOrDefault());
|
||||
|
||||
AutodetectProcess process = autodetectProcessFactory.createAutodetectProcess(job, ignoreDowntime);
|
||||
JobResultsPersister persister = new ElasticsearchPersister(jobId, client);
|
||||
// TODO Port the normalizer from the old project
|
||||
AutoDetectResultProcessor processor = new AutoDetectResultProcessor(new NoOpRenormaliser(), persister, parser);
|
||||
StateProcessor stateProcessor = new StateProcessor(settings, persister);
|
||||
return new AutodetectCommunicator(threadPool, job, process, jobLogger, statusReporter, processor, stateProcessor);
|
||||
|
||||
AutodetectProcess process = null;
|
||||
try {
|
||||
process = autodetectProcessFactory.createAutodetectProcess(job, ignoreDowntime, executorService);
|
||||
// TODO Port the normalizer from the old project
|
||||
return new AutodetectCommunicator(executorService, job, process, jobLogger, statusReporter, processor, stateProcessor);
|
||||
} catch (Exception e) {
|
||||
try {
|
||||
IOUtils.close(process);
|
||||
} catch (IOException ioe) {
|
||||
logger.error("Can't close autodetect", ioe);
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -153,7 +176,7 @@ public class AutodetectProcessManager extends AbstractComponent implements DataP
|
|||
@Override
|
||||
public void closeJob(String jobId) {
|
||||
logger.debug("Closing job {}", jobId);
|
||||
AutodetectCommunicator communicator = autoDetectCommunicatorByJob.get(jobId);
|
||||
AutodetectCommunicator communicator = autoDetectCommunicatorByJob.remove(jobId);
|
||||
if (communicator == null) {
|
||||
logger.debug("Cannot close: no active autodetect process for job {}", jobId);
|
||||
return;
|
||||
|
@ -161,13 +184,12 @@ public class AutodetectProcessManager extends AbstractComponent implements DataP
|
|||
|
||||
try {
|
||||
communicator.close();
|
||||
setJobFinishedTimeAndStatus(jobId, JobStatus.CLOSED);
|
||||
// TODO check for errors from autodetect
|
||||
// TODO delete associated files (model config etc)
|
||||
} catch (IOException e) {
|
||||
logger.info("Exception closing stopped process input stream", e);
|
||||
} finally {
|
||||
autoDetectCommunicatorByJob.remove(jobId);
|
||||
setJobFinishedTimeAndStatus(jobId, JobStatus.CLOSED);
|
||||
} catch (Exception e) {
|
||||
logger.warn("Exception closing stopped process input stream", e);
|
||||
throw ExceptionsHelper.serverError("Exception closing stopped process input stream", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -6,8 +6,8 @@
|
|||
package org.elasticsearch.xpack.prelert.job.process.autodetect;
|
||||
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.xpack.prelert.PrelertPlugin;
|
||||
import org.elasticsearch.ElasticsearchStatusException;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.xpack.prelert.job.AnalysisConfig;
|
||||
import org.elasticsearch.xpack.prelert.job.DataCounts;
|
||||
import org.elasticsearch.xpack.prelert.job.Job;
|
||||
|
@ -30,37 +30,41 @@ import java.io.InputStream;
|
|||
import java.time.Duration;
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
public class AutodetectCommunicator implements Closeable {
|
||||
|
||||
private static final int DEFAULT_TRY_COUNT = 5;
|
||||
private static final int DEFAULT_TRY_TIMEOUT_SECS = 6;
|
||||
|
||||
private final String jobId;
|
||||
private final Logger jobLogger;
|
||||
private final StatusReporter statusReporter;
|
||||
private final AutodetectProcess autodetectProcess;
|
||||
private final DataToProcessWriter autoDetectWriter;
|
||||
private final AutoDetectResultProcessor autoDetectResultProcessor;
|
||||
|
||||
public AutodetectCommunicator(ThreadPool threadPool, Job job, AutodetectProcess process, Logger jobLogger,
|
||||
final AtomicBoolean inUse = new AtomicBoolean(false);
|
||||
|
||||
public AutodetectCommunicator(ExecutorService autoDetectExecutor, Job job, AutodetectProcess process, Logger jobLogger,
|
||||
StatusReporter statusReporter, AutoDetectResultProcessor autoDetectResultProcessor,
|
||||
StateProcessor stateParser) {
|
||||
StateProcessor stateProcessor) {
|
||||
this.jobId = job.getJobId();
|
||||
this.autodetectProcess = process;
|
||||
this.jobLogger = jobLogger;
|
||||
this.statusReporter = statusReporter;
|
||||
this.autoDetectResultProcessor = autoDetectResultProcessor;
|
||||
|
||||
// TODO norelease: prevent that we fail to start any of the required threads for interacting with analytical process:
|
||||
// We should before we start the analytical process (and scheduler) verify that have enough threads.
|
||||
AnalysisConfig analysisConfig = job.getAnalysisConfig();
|
||||
boolean usePerPartitionNormalization = analysisConfig.getUsePerPartitionNormalization();
|
||||
threadPool.executor(PrelertPlugin.THREAD_POOL_NAME).execute(() -> {
|
||||
this.autoDetectResultProcessor.process(jobLogger, process.getProcessOutStream(), usePerPartitionNormalization);
|
||||
});
|
||||
threadPool.executor(PrelertPlugin.THREAD_POOL_NAME).execute(() ->
|
||||
stateParser.process(job.getId(), process.getPersistStream())
|
||||
autoDetectExecutor.execute(() ->
|
||||
autoDetectResultProcessor.process(jobLogger, process.getProcessOutStream(), usePerPartitionNormalization)
|
||||
);
|
||||
autoDetectExecutor.execute(() ->
|
||||
stateProcessor.process(job.getId(), process.getPersistStream())
|
||||
);
|
||||
|
||||
this.autoDetectWriter = createProcessWriter(job, process, statusReporter);
|
||||
}
|
||||
|
||||
|
@ -69,29 +73,32 @@ public class AutodetectCommunicator implements Closeable {
|
|||
job.getSchedulerConfig(), new TransformConfigs(job.getTransforms()) , statusReporter, jobLogger);
|
||||
}
|
||||
|
||||
public DataCounts writeToJob(InputStream inputStream) throws IOException {
|
||||
checkProcessIsAlive();
|
||||
CountingInputStream countingStream = new CountingInputStream(inputStream, statusReporter);
|
||||
DataCounts results = autoDetectWriter.write(countingStream);
|
||||
autoDetectWriter.flush();
|
||||
return results;
|
||||
public DataCounts writeToJob(InputStream inputStream, DataLoadParams params) throws IOException {
|
||||
return checkAndRun(() -> Messages.getMessage(Messages.JOB_DATA_CONCURRENT_USE_UPLOAD, jobId), () -> {
|
||||
if (params.isResettingBuckets()) {
|
||||
autodetectProcess.writeResetBucketsControlMessage(params);
|
||||
}
|
||||
CountingInputStream countingStream = new CountingInputStream(inputStream, statusReporter);
|
||||
DataCounts results = autoDetectWriter.write(countingStream);
|
||||
autoDetectWriter.flush();
|
||||
return results;
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
checkProcessIsAlive();
|
||||
autodetectProcess.close();
|
||||
autoDetectResultProcessor.awaitCompletion();
|
||||
}
|
||||
|
||||
public void writeResetBucketsControlMessage(DataLoadParams params) throws IOException {
|
||||
checkProcessIsAlive();
|
||||
autodetectProcess.writeResetBucketsControlMessage(params);
|
||||
checkAndRun(() -> Messages.getMessage(Messages.JOB_DATA_CONCURRENT_USE_CLOSE, jobId), () -> {
|
||||
autodetectProcess.close();
|
||||
autoDetectResultProcessor.awaitCompletion();
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
public void writeUpdateConfigMessage(String config) throws IOException {
|
||||
checkProcessIsAlive();
|
||||
autodetectProcess.writeUpdateConfigMessage(config);
|
||||
checkAndRun(() -> Messages.getMessage(Messages.JOB_DATA_CONCURRENT_USE_UPDATE, jobId), () -> {
|
||||
autodetectProcess.writeUpdateConfigMessage(config);
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
public void flushJob(InterimResultsParams params) throws IOException {
|
||||
|
@ -99,33 +106,36 @@ public class AutodetectCommunicator implements Closeable {
|
|||
}
|
||||
|
||||
void flushJob(InterimResultsParams params, int tryCount, int tryTimeoutSecs) throws IOException {
|
||||
String flushId = autodetectProcess.flushJob(params);
|
||||
checkAndRun(false, () -> Messages.getMessage(Messages.JOB_DATA_CONCURRENT_USE_FLUSH, jobId), () -> {
|
||||
int tryCountCounter = tryCount;
|
||||
String flushId = autodetectProcess.flushJob(params);
|
||||
|
||||
// TODO: norelease: I think waiting once 30 seconds will have the same effect as 5 * 6 seconds.
|
||||
// So we may want to remove this retry logic here
|
||||
Duration intermittentTimeout = Duration.ofSeconds(tryTimeoutSecs);
|
||||
boolean isFlushComplete = false;
|
||||
while (isFlushComplete == false && --tryCount >= 0) {
|
||||
// Check there wasn't an error in the flush
|
||||
if (!autodetectProcess.isProcessAlive()) {
|
||||
// TODO: norelease: I think waiting once 30 seconds will have the same effect as 5 * 6 seconds.
|
||||
// So we may want to remove this retry logic here
|
||||
Duration intermittentTimeout = Duration.ofSeconds(tryTimeoutSecs);
|
||||
boolean isFlushComplete = false;
|
||||
while (isFlushComplete == false && --tryCountCounter >= 0) {
|
||||
// Check there wasn't an error in the flush
|
||||
if (!autodetectProcess.isProcessAlive()) {
|
||||
String msg = Messages.getMessage(Messages.AUTODETECT_FLUSH_UNEXPTECTED_DEATH) + " " + autodetectProcess.readError();
|
||||
jobLogger.error(msg);
|
||||
throw ExceptionsHelper.serverError(msg);
|
||||
}
|
||||
isFlushComplete = autoDetectResultProcessor.waitForFlushAcknowledgement(flushId, intermittentTimeout);
|
||||
jobLogger.info("isFlushComplete={}", isFlushComplete);
|
||||
}
|
||||
|
||||
String msg = Messages.getMessage(Messages.AUTODETECT_FLUSH_UNEXPTECTED_DEATH) + " " + autodetectProcess.readError();
|
||||
if (!isFlushComplete) {
|
||||
String msg = Messages.getMessage(Messages.AUTODETECT_FLUSH_TIMEOUT) + " " + autodetectProcess.readError();
|
||||
jobLogger.error(msg);
|
||||
throw ExceptionsHelper.serverError(msg);
|
||||
}
|
||||
isFlushComplete = autoDetectResultProcessor.waitForFlushAcknowledgement(flushId, intermittentTimeout);
|
||||
jobLogger.info("isFlushComplete={}", isFlushComplete);
|
||||
}
|
||||
|
||||
if (!isFlushComplete) {
|
||||
String msg = Messages.getMessage(Messages.AUTODETECT_FLUSH_TIMEOUT) + " " + autodetectProcess.readError();
|
||||
jobLogger.error(msg);
|
||||
throw ExceptionsHelper.serverError(msg);
|
||||
}
|
||||
|
||||
// We also have to wait for the normaliser to become idle so that we block
|
||||
// clients from querying results in the middle of normalisation.
|
||||
autoDetectResultProcessor.waitUntilRenormaliserIsIdle();
|
||||
// We also have to wait for the normaliser to become idle so that we block
|
||||
// clients from querying results in the middle of normalisation.
|
||||
autoDetectResultProcessor.waitUntilRenormaliserIsIdle();
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -150,4 +160,30 @@ public class AutodetectCommunicator implements Closeable {
|
|||
public Optional<DataCounts> getDataCounts() {
|
||||
return Optional.ofNullable(statusReporter.runningTotalStats());
|
||||
}
|
||||
|
||||
private <T> T checkAndRun(Supplier<String> errorMessage, Callback<T> callback) throws IOException {
|
||||
return checkAndRun(true, errorMessage, callback);
|
||||
}
|
||||
|
||||
private <T> T checkAndRun(boolean checkIsAlive, Supplier<String> errorMessage, Callback<T> callback) throws IOException {
|
||||
if (inUse.compareAndSet(false, true)) {
|
||||
try {
|
||||
if (checkIsAlive) {
|
||||
checkProcessIsAlive();
|
||||
}
|
||||
return callback.run();
|
||||
} finally {
|
||||
inUse.set(false);
|
||||
}
|
||||
} else {
|
||||
throw new ElasticsearchStatusException(errorMessage.get(), RestStatus.TOO_MANY_REQUESTS);
|
||||
}
|
||||
}
|
||||
|
||||
private interface Callback<T> {
|
||||
|
||||
T run() throws IOException;
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -8,6 +8,7 @@ package org.elasticsearch.xpack.prelert.job.process.autodetect;
|
|||
import org.elasticsearch.xpack.prelert.job.process.autodetect.params.DataLoadParams;
|
||||
import org.elasticsearch.xpack.prelert.job.process.autodetect.params.InterimResultsParams;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.time.ZonedDateTime;
|
||||
|
@ -15,7 +16,7 @@ import java.time.ZonedDateTime;
|
|||
/**
|
||||
* Interface representing the native C++ autodetect process
|
||||
*/
|
||||
public interface AutodetectProcess {
|
||||
public interface AutodetectProcess extends Closeable {
|
||||
|
||||
/**
|
||||
* Write the record to autodetect. The record parameter should not be encoded
|
||||
|
@ -58,11 +59,6 @@ public interface AutodetectProcess {
|
|||
*/
|
||||
void flushStream() throws IOException;
|
||||
|
||||
/**
|
||||
* Close
|
||||
*/
|
||||
void close() throws IOException;
|
||||
|
||||
/**
|
||||
* Autodetect's output stream
|
||||
* @return output stream
|
||||
|
|
|
@ -7,6 +7,8 @@ package org.elasticsearch.xpack.prelert.job.process.autodetect;
|
|||
|
||||
import org.elasticsearch.xpack.prelert.job.Job;
|
||||
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
||||
/**
|
||||
* Factory interface for creating implementations of {@link AutodetectProcess}
|
||||
*/
|
||||
|
@ -16,7 +18,8 @@ public interface AutodetectProcessFactory {
|
|||
*
|
||||
* @param job Job configuration for the analysis process
|
||||
* @param ignoreDowntime Should gaps in data be treated as anomalous or as a maintenance window after a job re-start
|
||||
* @param executorService Executor service used to start the async tasks a job needs to operate the analytical process
|
||||
* @return The process
|
||||
*/
|
||||
AutodetectProcess createAutodetectProcess(Job job, boolean ignoreDowntime);
|
||||
AutodetectProcess createAutodetectProcess(Job job, boolean ignoreDowntime, ExecutorService executorService);
|
||||
}
|
||||
|
|
|
@ -44,8 +44,14 @@ public class BlackHoleAutodetectProcess implements AutodetectProcess, Closeable
|
|||
processOutStream = new PipedInputStream();
|
||||
persistStream = new PipedInputStream();
|
||||
try {
|
||||
// jackson tries to read the first 4 bytes:
|
||||
// if we don't do this the autodetect communication would fail starting
|
||||
pipedProcessOutStream = new PipedOutputStream(processOutStream);
|
||||
pipedProcessOutStream.write(' ');
|
||||
pipedProcessOutStream.write(' ');
|
||||
pipedProcessOutStream.write(' ');
|
||||
pipedProcessOutStream.write('[');
|
||||
pipedProcessOutStream.flush();
|
||||
pipedPersistStream = new PipedOutputStream(persistStream);
|
||||
} catch (IOException e) {
|
||||
LOGGER.error("Error connecting PipedOutputStream", e);
|
||||
|
|
|
@ -7,6 +7,7 @@ package org.elasticsearch.xpack.prelert.job.process.autodetect;
|
|||
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
||||
import org.elasticsearch.xpack.prelert.job.logging.CppLogMessageHandler;
|
||||
import org.elasticsearch.xpack.prelert.job.process.autodetect.params.DataLoadParams;
|
||||
import org.elasticsearch.xpack.prelert.job.process.autodetect.params.InterimResultsParams;
|
||||
|
@ -22,11 +23,16 @@ import java.nio.file.Files;
|
|||
import java.nio.file.Path;
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
/**
|
||||
* Autodetect process using native code.
|
||||
*/
|
||||
public class NativeAutodetectProcess implements AutodetectProcess {
|
||||
class NativeAutodetectProcess implements AutodetectProcess {
|
||||
private static final Logger LOGGER = Loggers.getLogger(NativeAutodetectProcess.class);
|
||||
|
||||
private final CppLogMessageHandler cppLogHandler;
|
||||
|
@ -37,11 +43,11 @@ public class NativeAutodetectProcess implements AutodetectProcess {
|
|||
private final ZonedDateTime startTime;
|
||||
private final int numberOfAnalysisFields;
|
||||
private final List<Path> filesToDelete;
|
||||
private Thread logTailThread;
|
||||
private Future<?> logTailThread;
|
||||
|
||||
public NativeAutodetectProcess(String jobId, InputStream logStream, OutputStream processInStream,
|
||||
InputStream processOutStream, InputStream persistStream,
|
||||
int numberOfAnalysisFields, List<Path> filesToDelete) {
|
||||
NativeAutodetectProcess(String jobId, InputStream logStream, OutputStream processInStream, InputStream processOutStream,
|
||||
InputStream persistStream, int numberOfAnalysisFields, List<Path> filesToDelete,
|
||||
ExecutorService executorService) throws EsRejectedExecutionException {
|
||||
cppLogHandler = new CppLogMessageHandler(jobId, logStream);
|
||||
this.processInStream = new BufferedOutputStream(processInStream);
|
||||
this.processOutStream = processOutStream;
|
||||
|
@ -50,18 +56,13 @@ public class NativeAutodetectProcess implements AutodetectProcess {
|
|||
startTime = ZonedDateTime.now();
|
||||
this.numberOfAnalysisFields = numberOfAnalysisFields;
|
||||
this.filesToDelete = filesToDelete;
|
||||
}
|
||||
|
||||
void tailLogsInThread() {
|
||||
logTailThread = new Thread(() -> {
|
||||
try {
|
||||
cppLogHandler.tailStream();
|
||||
cppLogHandler.close();
|
||||
logTailThread = executorService.submit(() -> {
|
||||
try (CppLogMessageHandler h = cppLogHandler) {
|
||||
h.tailStream();
|
||||
} catch (IOException e) {
|
||||
LOGGER.error("Error tailing C++ process logs", e);
|
||||
}
|
||||
});
|
||||
logTailThread.start();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -98,16 +99,15 @@ public class NativeAutodetectProcess implements AutodetectProcess {
|
|||
try {
|
||||
// closing its input causes the process to exit
|
||||
processInStream.close();
|
||||
|
||||
// wait for the process to exit by waiting for end-of-file on the named pipe connected to its logger
|
||||
if (logTailThread != null) {
|
||||
logTailThread.join();
|
||||
}
|
||||
|
||||
// this may take a long time as it persists the model state
|
||||
logTailThread.get(30, TimeUnit.MINUTES);
|
||||
if (cppLogHandler.seenFatalError()) {
|
||||
throw ExceptionsHelper.serverError(cppLogHandler.getErrors());
|
||||
}
|
||||
LOGGER.info("Process exited");
|
||||
} catch (ExecutionException | TimeoutException e) {
|
||||
LOGGER.warn("Exception closing the running native process", e);
|
||||
} catch (InterruptedException e) {
|
||||
LOGGER.warn("Exception closing the running native process");
|
||||
Thread.currentThread().interrupt();
|
||||
|
|
|
@ -6,9 +6,12 @@
|
|||
package org.elasticsearch.xpack.prelert.job.process.autodetect;
|
||||
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.xpack.prelert.job.Job;
|
||||
import org.elasticsearch.xpack.prelert.job.ModelSnapshot;
|
||||
import org.elasticsearch.xpack.prelert.job.persistence.JobProvider;
|
||||
|
@ -30,16 +33,19 @@ import java.util.List;
|
|||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
||||
public class NativeAutodetectProcessFactory implements AutodetectProcessFactory {
|
||||
|
||||
private static final Logger LOGGER = Loggers.getLogger(NativeAutodetectProcessFactory.class);
|
||||
private static final NamedPipeHelper NAMED_PIPE_HELPER = new NamedPipeHelper();
|
||||
private static final Duration PROCESS_STARTUP_TIMEOUT = Duration.ofSeconds(2);
|
||||
|
||||
private final Environment env;
|
||||
private final Settings settings;
|
||||
private final JobProvider jobProvider;
|
||||
private Environment env;
|
||||
private Settings settings;
|
||||
private NativeController nativeController;
|
||||
private final NativeController nativeController;
|
||||
|
||||
public NativeAutodetectProcessFactory(JobProvider jobProvider, Environment env, Settings settings, NativeController nativeController) {
|
||||
this.env = Objects.requireNonNull(env);
|
||||
|
@ -49,7 +55,7 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory
|
|||
}
|
||||
|
||||
@Override
|
||||
public AutodetectProcess createAutodetectProcess(Job job, boolean ignoreDowntime) {
|
||||
public AutodetectProcess createAutodetectProcess(Job job, boolean ignoreDowntime, ExecutorService executorService) {
|
||||
List<Path> filesToDelete = new ArrayList<>();
|
||||
List<ModelSnapshot> modelSnapshots = jobProvider.modelSnapshots(job.getId(), 0, 1).results();
|
||||
ModelSnapshot modelSnapshot = (modelSnapshots != null && !modelSnapshots.isEmpty()) ? modelSnapshots.get(0) : null;
|
||||
|
@ -58,32 +64,32 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory
|
|||
true, false, true, true, modelSnapshot != null, !ProcessCtrl.DONT_PERSIST_MODEL_STATE_SETTING.get(settings));
|
||||
createNativeProcess(job, processPipes, ignoreDowntime, filesToDelete);
|
||||
int numberOfAnalysisFields = job.getAnalysisConfig().analysisFields().size();
|
||||
NativeAutodetectProcess autodetect = new NativeAutodetectProcess(job.getId(), processPipes.getLogStream().get(),
|
||||
processPipes.getProcessInStream().get(), processPipes.getProcessOutStream().get(),
|
||||
processPipes.getPersistStream().get(), numberOfAnalysisFields, filesToDelete);
|
||||
autodetect.tailLogsInThread();
|
||||
if (modelSnapshot != null) {
|
||||
restoreStateInThread(job.getId(), modelSnapshot, processPipes.getRestoreStream().get());
|
||||
}
|
||||
return autodetect;
|
||||
}
|
||||
|
||||
private void restoreStateInThread(String jobId, ModelSnapshot modelSnapshot, OutputStream restoreStream) {
|
||||
new Thread(() -> {
|
||||
try {
|
||||
jobProvider.restoreStateToStream(jobId, modelSnapshot, restoreStream);
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Error restoring model state for job " + jobId, e);
|
||||
NativeAutodetectProcess autodetect = null;
|
||||
try {
|
||||
autodetect = new NativeAutodetectProcess(job.getId(), processPipes.getLogStream().get(),
|
||||
processPipes.getProcessInStream().get(), processPipes.getProcessOutStream().get(),
|
||||
processPipes.getPersistStream().get(), numberOfAnalysisFields, filesToDelete, executorService);
|
||||
if (modelSnapshot != null) {
|
||||
// TODO (norelease): I don't think we should do this in the background. If this happens then we should wait
|
||||
// until restore it is done before we can accept data.
|
||||
executorService.execute(() -> {
|
||||
try (OutputStream r = processPipes.getRestoreStream().get()) {
|
||||
jobProvider.restoreStateToStream(job.getJobId(), modelSnapshot, r);
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Error restoring model state for job " + job.getId(), e);
|
||||
}
|
||||
});
|
||||
}
|
||||
// The restore stream will not be needed again. If an error occurred getting state to restore then
|
||||
// it's critical to close the restore stream so that the C++ code can realise that it will never
|
||||
// receive any state to restore. If restoration went smoothly then this is just good practice.
|
||||
return autodetect;
|
||||
} catch (EsRejectedExecutionException e) {
|
||||
try {
|
||||
restoreStream.close();
|
||||
} catch (IOException e) {
|
||||
LOGGER.error("Error closing restore stream for job " + jobId, e);
|
||||
IOUtils.close(autodetect);
|
||||
} catch (IOException ioe) {
|
||||
LOGGER.error("Can't close autodetect", ioe);
|
||||
}
|
||||
}).start();
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
private void createNativeProcess(Job job, ProcessPipes processPipes, boolean ignoreDowntime, List<Path> filesToDelete) {
|
||||
|
|
|
@ -22,6 +22,7 @@ import java.time.Duration;
|
|||
import java.util.Locale;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* A runnable class that reads the autodetect process output
|
||||
|
@ -37,7 +38,7 @@ public class AutoDetectResultProcessor {
|
|||
private final JobResultsPersister persister;
|
||||
private final AutodetectResultsParser parser;
|
||||
|
||||
private final CountDownLatch latch = new CountDownLatch(1);
|
||||
private final CountDownLatch completionLatch = new CountDownLatch(1);
|
||||
private final FlushListener flushListener;
|
||||
|
||||
private volatile ModelSizeStats latestModelSizeStats;
|
||||
|
@ -72,7 +73,7 @@ public class AutoDetectResultProcessor {
|
|||
} catch (Exception e) {
|
||||
jobLogger.info("Error parsing autodetect output", e);
|
||||
} finally {
|
||||
latch.countDown();
|
||||
completionLatch.countDown();
|
||||
flushListener.clear();
|
||||
renormaliser.shutdown(jobLogger);
|
||||
}
|
||||
|
@ -152,7 +153,7 @@ public class AutoDetectResultProcessor {
|
|||
|
||||
public void awaitCompletion() {
|
||||
try {
|
||||
latch.await();
|
||||
completionLatch.await();
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new RuntimeException(e);
|
||||
|
|
|
@ -77,7 +77,7 @@ public class ScheduledJobService extends AbstractComponent {
|
|||
Holder holder = createJobScheduler(job);
|
||||
registry.put(job.getId(), holder);
|
||||
|
||||
threadPool.executor(PrelertPlugin.THREAD_POOL_NAME).execute(() -> {
|
||||
threadPool.executor(PrelertPlugin.SCHEDULER_THREAD_POOL_NAME).execute(() -> {
|
||||
try {
|
||||
Long next = holder.scheduledJob.runLookBack(allocation.getSchedulerState());
|
||||
if (next != null) {
|
||||
|
@ -136,7 +136,7 @@ public class ScheduledJobService extends AbstractComponent {
|
|||
if (holder.scheduledJob.isRunning()) {
|
||||
TimeValue delay = computeNextDelay(delayInMsSinceEpoch);
|
||||
logger.debug("Waiting [{}] before executing next realtime import for job [{}]", delay, jobId);
|
||||
threadPool.schedule(delay, PrelertPlugin.THREAD_POOL_NAME, () -> {
|
||||
threadPool.schedule(delay, PrelertPlugin.SCHEDULER_THREAD_POOL_NAME, () -> {
|
||||
long nextDelayInMsSinceEpoch;
|
||||
try {
|
||||
nextDelayInMsSinceEpoch = holder.scheduledJob.runRealtime();
|
||||
|
|
|
@ -10,6 +10,7 @@ import org.elasticsearch.action.bulk.BulkResponse;
|
|||
import org.elasticsearch.action.get.GetResponse;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.support.WriteRequest;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.common.ParseFieldMatcher;
|
||||
import org.elasticsearch.common.network.NetworkAddress;
|
||||
|
@ -55,28 +56,7 @@ public class ScheduledJobsIT extends ESIntegTestCase {
|
|||
|
||||
@After
|
||||
public void clearPrelertMetadata() throws Exception {
|
||||
MetaData metaData = client().admin().cluster().prepareState().get().getState().getMetaData();
|
||||
PrelertMetadata prelertMetadata = metaData.custom(PrelertMetadata.TYPE);
|
||||
for (Map.Entry<String, Job> entry : prelertMetadata.getJobs().entrySet()) {
|
||||
String jobId = entry.getKey();
|
||||
try {
|
||||
StopJobSchedulerAction.Response response =
|
||||
client().execute(StopJobSchedulerAction.INSTANCE, new StopJobSchedulerAction.Request(jobId)).get();
|
||||
assertTrue(response.isAcknowledged());
|
||||
} catch (Exception e) {
|
||||
// ignore
|
||||
}
|
||||
try {
|
||||
PostDataCloseAction.Response response =
|
||||
client().execute(PostDataCloseAction.INSTANCE, new PostDataCloseAction.Request(jobId)).get();
|
||||
assertTrue(response.isAcknowledged());
|
||||
} catch (Exception e) {
|
||||
// ignore
|
||||
}
|
||||
DeleteJobAction.Response response =
|
||||
client().execute(DeleteJobAction.INSTANCE, new DeleteJobAction.Request(jobId)).get();
|
||||
assertTrue(response.isAcknowledged());
|
||||
}
|
||||
clearPrelertMetadata(client());
|
||||
}
|
||||
|
||||
public void testLookbackOnly() throws Exception {
|
||||
|
@ -208,4 +188,29 @@ public class ScheduledJobsIT extends ESIntegTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public static void clearPrelertMetadata(Client client) throws Exception {
|
||||
MetaData metaData = client.admin().cluster().prepareState().get().getState().getMetaData();
|
||||
PrelertMetadata prelertMetadata = metaData.custom(PrelertMetadata.TYPE);
|
||||
for (Map.Entry<String, Job> entry : prelertMetadata.getJobs().entrySet()) {
|
||||
String jobId = entry.getKey();
|
||||
try {
|
||||
StopJobSchedulerAction.Response response =
|
||||
client.execute(StopJobSchedulerAction.INSTANCE, new StopJobSchedulerAction.Request(jobId)).get();
|
||||
assertTrue(response.isAcknowledged());
|
||||
} catch (Exception e) {
|
||||
// ignore
|
||||
}
|
||||
try {
|
||||
PostDataCloseAction.Response response =
|
||||
client.execute(PostDataCloseAction.INSTANCE, new PostDataCloseAction.Request(jobId)).get();
|
||||
assertTrue(response.isAcknowledged());
|
||||
} catch (Exception e) {
|
||||
// ignore
|
||||
}
|
||||
DeleteJobAction.Response response =
|
||||
client.execute(DeleteJobAction.INSTANCE, new DeleteJobAction.Request(jobId)).get();
|
||||
assertTrue(response.isAcknowledged());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,97 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.prelert.integration;
|
||||
|
||||
import org.elasticsearch.ElasticsearchStatusException;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.xpack.prelert.PrelertPlugin;
|
||||
import org.elasticsearch.xpack.prelert.action.PostDataAction;
|
||||
import org.elasticsearch.xpack.prelert.action.PutJobAction;
|
||||
import org.elasticsearch.xpack.prelert.action.ScheduledJobsIT;
|
||||
import org.elasticsearch.xpack.prelert.job.AnalysisConfig;
|
||||
import org.elasticsearch.xpack.prelert.job.DataDescription;
|
||||
import org.elasticsearch.xpack.prelert.job.Detector;
|
||||
import org.elasticsearch.xpack.prelert.job.Job;
|
||||
import org.junit.After;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
|
||||
@ESIntegTestCase.ClusterScope(numDataNodes = 1)
|
||||
public class TooManyJobsIT extends ESIntegTestCase {
|
||||
|
||||
@Override
|
||||
protected Collection<Class<? extends Plugin>> nodePlugins() {
|
||||
return Collections.singleton(PrelertPlugin.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
|
||||
return nodePlugins();
|
||||
}
|
||||
|
||||
@After
|
||||
public void clearPrelertMetadata() throws Exception {
|
||||
ScheduledJobsIT.clearPrelertMetadata(client());
|
||||
}
|
||||
|
||||
public void testCannotStartTooManyAnalyticalProcesses() throws Exception {
|
||||
String jsonLine = "{\"time\": \"0\"}";
|
||||
int maxNumJobs = 1000;
|
||||
for (int i = 1; i <= maxNumJobs; i++) {
|
||||
Job.Builder job = createJob(Integer.toString(i));
|
||||
PutJobAction.Request putJobRequest = new PutJobAction.Request(job.build(true));
|
||||
PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).get();
|
||||
assertTrue(putJobResponse.isAcknowledged());
|
||||
|
||||
// triggers creating autodetect process:
|
||||
PostDataAction.Request postDataRequest = new PostDataAction.Request(job.getId());
|
||||
postDataRequest.setContent(new BytesArray(jsonLine));
|
||||
try {
|
||||
PostDataAction.Response postDataResponse = client().execute(PostDataAction.INSTANCE, postDataRequest).get();
|
||||
assertEquals(1, postDataResponse.getDataCounts().getInputRecordCount());
|
||||
logger.info("Posted data {} times", i);
|
||||
} catch (Exception e) {
|
||||
Throwable cause = ExceptionsHelper.unwrapCause(e.getCause());
|
||||
assertEquals(ElasticsearchStatusException.class, cause.getClass());
|
||||
assertEquals(RestStatus.TOO_MANY_REQUESTS, ((ElasticsearchStatusException) cause).status());
|
||||
logger.info("good news everybody --> reached threadpool capacity after starting {}th analytical process", i);
|
||||
|
||||
// now manually clean things up and see if we can succeed to start one new job
|
||||
clearPrelertMetadata();
|
||||
putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).get();
|
||||
assertTrue(putJobResponse.isAcknowledged());
|
||||
PostDataAction.Response postDataResponse = client().execute(PostDataAction.INSTANCE, postDataRequest).get();
|
||||
assertEquals(1, postDataResponse.getDataCounts().getInputRecordCount());
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
fail("shouldn't be able to add [" + maxNumJobs + "] jobs");
|
||||
}
|
||||
|
||||
private Job.Builder createJob(String id) {
|
||||
DataDescription.Builder dataDescription = new DataDescription.Builder();
|
||||
dataDescription.setFormat(DataDescription.DataFormat.JSON);
|
||||
dataDescription.setTimeFormat(DataDescription.EPOCH_MS);
|
||||
|
||||
Detector.Builder d = new Detector.Builder("count", null);
|
||||
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(d.build()));
|
||||
|
||||
Job.Builder builder = new Job.Builder();
|
||||
builder.setId(id);
|
||||
|
||||
builder.setAnalysisConfig(analysisConfig);
|
||||
builder.setDataDescription(dataDescription);
|
||||
return builder;
|
||||
}
|
||||
|
||||
}
|
|
@ -8,6 +8,7 @@ package org.elasticsearch.xpack.prelert.job.manager;
|
|||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
@ -20,6 +21,7 @@ import org.elasticsearch.xpack.prelert.job.JobStatus;
|
|||
import org.elasticsearch.xpack.prelert.job.metadata.Allocation;
|
||||
import org.elasticsearch.xpack.prelert.job.persistence.JobProvider;
|
||||
import org.elasticsearch.xpack.prelert.job.process.autodetect.AutodetectCommunicator;
|
||||
import org.elasticsearch.xpack.prelert.job.process.autodetect.AutodetectProcess;
|
||||
import org.elasticsearch.xpack.prelert.job.process.autodetect.AutodetectProcessFactory;
|
||||
import org.elasticsearch.xpack.prelert.job.process.autodetect.output.AutodetectResultsParser;
|
||||
import org.elasticsearch.xpack.prelert.job.process.autodetect.params.DataLoadParams;
|
||||
|
@ -33,13 +35,16 @@ import java.io.IOException;
|
|||
import java.io.InputStream;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Collections;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
||||
import static org.elasticsearch.mock.orig.Mockito.doThrow;
|
||||
import static org.elasticsearch.mock.orig.Mockito.times;
|
||||
import static org.elasticsearch.mock.orig.Mockito.verify;
|
||||
import static org.elasticsearch.mock.orig.Mockito.when;
|
||||
import static org.hamcrest.core.IsEqual.equalTo;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyBoolean;
|
||||
import static org.mockito.Matchers.anyString;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.spy;
|
||||
|
@ -75,11 +80,12 @@ public class AutodetectProcessManagerTests extends ESTestCase {
|
|||
AutodetectCommunicator communicator = Mockito.mock(AutodetectCommunicator.class);
|
||||
AutodetectProcessManager manager = createManager(communicator);
|
||||
|
||||
DataLoadParams params = mock(DataLoadParams.class);
|
||||
InputStream inputStream = createInputStream("");
|
||||
doThrow(new IOException("blah")).when(communicator).writeToJob(inputStream);
|
||||
doThrow(new IOException("blah")).when(communicator).writeToJob(inputStream, params);
|
||||
|
||||
ESTestCase.expectThrows(ElasticsearchException.class,
|
||||
() -> manager.processData("foo", inputStream, mock(DataLoadParams.class)));
|
||||
() -> manager.processData("foo", inputStream, params));
|
||||
}
|
||||
|
||||
public void testCloseJob() {
|
||||
|
@ -103,9 +109,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
|
|||
DataLoadParams params = new DataLoadParams(TimeRange.builder().startTime("1000").endTime("2000").build(), true);
|
||||
InputStream inputStream = createInputStream("");
|
||||
manager.processData("foo", inputStream, params);
|
||||
|
||||
verify(communicator).writeResetBucketsControlMessage(params);
|
||||
verify(communicator).writeToJob(inputStream);
|
||||
verify(communicator).writeToJob(inputStream, params);
|
||||
}
|
||||
|
||||
public void testFlush() throws IOException {
|
||||
|
@ -178,6 +182,26 @@ public class AutodetectProcessManagerTests extends ESTestCase {
|
|||
assertThat(dataCounts, equalTo(new DataCounts("foo")));
|
||||
}
|
||||
|
||||
public void testCreate_notEnoughThreads() throws IOException {
|
||||
Client client = mock(Client.class);
|
||||
Environment environment = mock(Environment.class);
|
||||
ThreadPool threadPool = mock(ThreadPool.class);
|
||||
ExecutorService executorService = mock(ExecutorService.class);
|
||||
doThrow(new EsRejectedExecutionException("")).when(executorService).execute(any());
|
||||
when(threadPool.executor(anyString())).thenReturn(executorService);
|
||||
when(jobManager.getJobOrThrowIfUnknown("_id")).thenReturn(createJobDetails("_id"));
|
||||
when(jobProvider.dataCounts("_id")).thenReturn(new DataCounts("_id"));
|
||||
|
||||
AutodetectResultsParser parser = mock(AutodetectResultsParser.class);
|
||||
AutodetectProcess autodetectProcess = mock(AutodetectProcess.class);
|
||||
AutodetectProcessFactory autodetectProcessFactory = (j, i, e) -> autodetectProcess;
|
||||
AutodetectProcessManager manager = new AutodetectProcessManager(Settings.EMPTY, client, environment, threadPool,
|
||||
jobManager, jobProvider, parser, autodetectProcessFactory);
|
||||
|
||||
expectThrows(EsRejectedExecutionException.class, () -> manager.create("_id", false));
|
||||
verify(autodetectProcess, times(1)).close();
|
||||
}
|
||||
|
||||
private void givenAllocationWithStatus(JobStatus status) {
|
||||
Allocation.Builder allocation = new Allocation.Builder();
|
||||
allocation.setStatus(status);
|
||||
|
|
|
@ -7,9 +7,8 @@ package org.elasticsearch.xpack.prelert.job.process.autodetect;
|
|||
|
||||
import org.apache.logging.log4j.core.Logger;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ElasticsearchStatusException;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.xpack.prelert.PrelertPlugin;
|
||||
import org.elasticsearch.xpack.prelert.job.AnalysisConfig;
|
||||
import org.elasticsearch.xpack.prelert.job.DataDescription;
|
||||
import org.elasticsearch.xpack.prelert.job.Detector;
|
||||
|
@ -23,13 +22,18 @@ import org.elasticsearch.xpack.prelert.job.process.autodetect.params.TimeRange;
|
|||
import org.elasticsearch.xpack.prelert.job.status.StatusReporter;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.Collections;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
||||
import static org.elasticsearch.mock.orig.Mockito.doAnswer;
|
||||
import static org.elasticsearch.mock.orig.Mockito.never;
|
||||
import static org.elasticsearch.mock.orig.Mockito.times;
|
||||
import static org.elasticsearch.mock.orig.Mockito.verify;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyInt;
|
||||
import static org.mockito.Matchers.anyString;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
@ -40,7 +44,7 @@ public class AutodetectCommunicatorTests extends ESTestCase {
|
|||
DataLoadParams params = new DataLoadParams(TimeRange.builder().startTime("1").endTime("2").build());
|
||||
AutodetectProcess process = mockAutodetectProcessWithOutputStream();
|
||||
try (AutodetectCommunicator communicator = createAutodetectCommunicator(process, mock(AutoDetectResultProcessor.class))) {
|
||||
communicator.writeResetBucketsControlMessage(params);
|
||||
communicator.writeToJob(new ByteArrayInputStream(new byte[0]), params);
|
||||
Mockito.verify(process).writeResetBucketsControlMessage(params);
|
||||
}
|
||||
}
|
||||
|
@ -123,19 +127,70 @@ public class AutodetectCommunicatorTests extends ESTestCase {
|
|||
|
||||
private AutodetectCommunicator createAutodetectCommunicator(AutodetectProcess autodetectProcess,
|
||||
AutoDetectResultProcessor autoDetectResultProcessor) throws IOException {
|
||||
ThreadPool threadPool = mock(ThreadPool.class);
|
||||
ExecutorService executorService = Mockito.mock(ExecutorService.class);
|
||||
ExecutorService executorService = mock(ExecutorService.class);
|
||||
doAnswer(invocation -> {
|
||||
((Runnable) invocation.getArguments()[0]).run();
|
||||
return null;
|
||||
}).when(executorService).execute(any(Runnable.class));
|
||||
Mockito.when(threadPool.executor(PrelertPlugin.THREAD_POOL_NAME)).thenReturn(executorService);
|
||||
Logger jobLogger = Mockito.mock(Logger.class);
|
||||
JobResultsPersister resultsPersister = mock(JobResultsPersister.class);
|
||||
StatusReporter statusReporter = mock(StatusReporter.class);
|
||||
StateProcessor stateProcessor = mock(StateProcessor.class);
|
||||
return new AutodetectCommunicator(threadPool, createJobDetails(), autodetectProcess, jobLogger,
|
||||
statusReporter, autoDetectResultProcessor, stateProcessor);
|
||||
return new AutodetectCommunicator(executorService, createJobDetails(), autodetectProcess, jobLogger, statusReporter,
|
||||
autoDetectResultProcessor, stateProcessor);
|
||||
}
|
||||
|
||||
public void testWriteToJobInUse() throws IOException {
|
||||
InputStream in = mock(InputStream.class);
|
||||
when(in.read(any(byte[].class), anyInt(), anyInt())).thenReturn(-1);
|
||||
AutodetectProcess process = mockAutodetectProcessWithOutputStream();
|
||||
AutodetectCommunicator communicator = createAutodetectCommunicator(process, mock(AutoDetectResultProcessor.class));
|
||||
|
||||
communicator.inUse.set(true);
|
||||
expectThrows(ElasticsearchStatusException.class, () -> communicator.writeToJob(in, mock(DataLoadParams.class)));
|
||||
|
||||
communicator.inUse.set(false);
|
||||
communicator.writeToJob(in, mock(DataLoadParams.class));
|
||||
}
|
||||
|
||||
public void testFlushInUse() throws IOException {
|
||||
AutodetectProcess process = mockAutodetectProcessWithOutputStream();
|
||||
AutoDetectResultProcessor resultProcessor = mock(AutoDetectResultProcessor.class);
|
||||
when(resultProcessor.waitForFlushAcknowledgement(any(), any())).thenReturn(true);
|
||||
AutodetectCommunicator communicator = createAutodetectCommunicator(process, resultProcessor);
|
||||
|
||||
communicator.inUse.set(true);
|
||||
InterimResultsParams params = mock(InterimResultsParams.class);
|
||||
expectThrows(ElasticsearchStatusException.class, () -> communicator.flushJob(params));
|
||||
|
||||
communicator.inUse.set(false);
|
||||
communicator.flushJob(params);
|
||||
}
|
||||
|
||||
public void testCloseInUse() throws IOException {
|
||||
AutodetectProcess process = mockAutodetectProcessWithOutputStream();
|
||||
AutoDetectResultProcessor resultProcessor = mock(AutoDetectResultProcessor.class);
|
||||
when(resultProcessor.waitForFlushAcknowledgement(any(), any())).thenReturn(true);
|
||||
AutodetectCommunicator communicator = createAutodetectCommunicator(process, resultProcessor);
|
||||
|
||||
communicator.inUse.set(true);
|
||||
expectThrows(ElasticsearchStatusException.class, communicator::close);
|
||||
|
||||
communicator.inUse.set(false);
|
||||
communicator.close();
|
||||
}
|
||||
|
||||
public void testWriteUpdateConfigMessageInUse() throws Exception {
|
||||
AutodetectProcess process = mockAutodetectProcessWithOutputStream();
|
||||
AutoDetectResultProcessor resultProcessor = mock(AutoDetectResultProcessor.class);
|
||||
when(resultProcessor.waitForFlushAcknowledgement(any(), any())).thenReturn(true);
|
||||
AutodetectCommunicator communicator = createAutodetectCommunicator(process, resultProcessor);
|
||||
|
||||
communicator.inUse.set(true);
|
||||
expectThrows(ElasticsearchStatusException.class, () -> communicator.writeUpdateConfigMessage(""));
|
||||
|
||||
communicator.inUse.set(false);
|
||||
communicator.writeUpdateConfigMessage("");
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -5,6 +5,7 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.prelert.job.process.autodetect;
|
||||
|
||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.xpack.prelert.job.process.autodetect.params.DataLoadParams;
|
||||
import org.elasticsearch.xpack.prelert.job.process.autodetect.params.InterimResultsParams;
|
||||
|
@ -25,15 +26,18 @@ import java.util.Collections;
|
|||
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class NativeAutodetectProcessTests extends ESTestCase {
|
||||
|
||||
private static final int NUMBER_ANALYSIS_FIELDS = 3;
|
||||
|
||||
public void testProcessStartTime() throws InterruptedException {
|
||||
NativeAutodetectProcess process = new NativeAutodetectProcess("foo", Mockito.mock(InputStream.class),
|
||||
public void testProcessStartTime() throws Exception {
|
||||
InputStream logStream = Mockito.mock(InputStream.class);
|
||||
when(logStream.read(new byte[1024])).thenReturn(-1);
|
||||
NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream,
|
||||
Mockito.mock(OutputStream.class), Mockito.mock(InputStream.class), Mockito.mock(InputStream.class),
|
||||
NUMBER_ANALYSIS_FIELDS, null);
|
||||
NUMBER_ANALYSIS_FIELDS, null, EsExecutors.newDirectExecutorService());
|
||||
|
||||
ZonedDateTime startTime = process.getProcessStartTime();
|
||||
Thread.sleep(500);
|
||||
|
@ -45,13 +49,13 @@ public class NativeAutodetectProcessTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testWriteRecord() throws IOException {
|
||||
InputStream logStream = Mockito.mock(InputStream.class);
|
||||
when(logStream.read(new byte[1024])).thenReturn(-1);
|
||||
String[] record = {"r1", "r2", "r3", "r4", "r5"};
|
||||
|
||||
ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
|
||||
|
||||
NativeAutodetectProcess process = new NativeAutodetectProcess("foo", Mockito.mock(InputStream.class),
|
||||
NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream,
|
||||
bos, Mockito.mock(InputStream.class), Mockito.mock(InputStream.class),
|
||||
NUMBER_ANALYSIS_FIELDS, Collections.emptyList());
|
||||
NUMBER_ANALYSIS_FIELDS, Collections.emptyList(), EsExecutors.newDirectExecutorService());
|
||||
|
||||
process.writeRecord(record);
|
||||
process.flushStream();
|
||||
|
@ -76,10 +80,12 @@ public class NativeAutodetectProcessTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testFlush() throws IOException {
|
||||
InputStream logStream = Mockito.mock(InputStream.class);
|
||||
when(logStream.read(new byte[1024])).thenReturn(-1);
|
||||
ByteArrayOutputStream bos = new ByteArrayOutputStream(ControlMsgToProcessWriter.FLUSH_SPACES_LENGTH + 1024);
|
||||
NativeAutodetectProcess process = new NativeAutodetectProcess("foo", Mockito.mock(InputStream.class),
|
||||
NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream,
|
||||
bos, Mockito.mock(InputStream.class), Mockito.mock(InputStream.class),
|
||||
NUMBER_ANALYSIS_FIELDS, Collections.emptyList());
|
||||
NUMBER_ANALYSIS_FIELDS, Collections.emptyList(), EsExecutors.newDirectExecutorService());
|
||||
|
||||
InterimResultsParams params = InterimResultsParams.builder().build();
|
||||
process.flushJob(params);
|
||||
|
@ -89,10 +95,12 @@ public class NativeAutodetectProcessTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testWriteResetBucketsControlMessage() throws IOException {
|
||||
InputStream logStream = Mockito.mock(InputStream.class);
|
||||
when(logStream.read(new byte[1024])).thenReturn(-1);
|
||||
ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
|
||||
NativeAutodetectProcess process = new NativeAutodetectProcess("foo", Mockito.mock(InputStream.class),
|
||||
NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream,
|
||||
bos, Mockito.mock(InputStream.class), Mockito.mock(InputStream.class),
|
||||
NUMBER_ANALYSIS_FIELDS, Collections.emptyList());
|
||||
NUMBER_ANALYSIS_FIELDS, Collections.emptyList(), EsExecutors.newDirectExecutorService());
|
||||
|
||||
DataLoadParams params = new DataLoadParams(TimeRange.builder().startTime("1").endTime("86400").build(), true);
|
||||
process.writeResetBucketsControlMessage(params);
|
||||
|
@ -103,10 +111,12 @@ public class NativeAutodetectProcessTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testWriteUpdateConfigMessage() throws IOException {
|
||||
InputStream logStream = Mockito.mock(InputStream.class);
|
||||
when(logStream.read(new byte[1024])).thenReturn(-1);
|
||||
ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
|
||||
NativeAutodetectProcess process = new NativeAutodetectProcess("foo", Mockito.mock(InputStream.class),
|
||||
NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream,
|
||||
bos, Mockito.mock(InputStream.class), Mockito.mock(InputStream.class),
|
||||
NUMBER_ANALYSIS_FIELDS, Collections.emptyList());
|
||||
NUMBER_ANALYSIS_FIELDS, Collections.emptyList(), EsExecutors.newDirectExecutorService());
|
||||
|
||||
process.writeUpdateConfigMessage("");
|
||||
process.flushStream();
|
||||
|
|
|
@ -81,7 +81,7 @@ public class ScheduledJobServiceTests extends ESTestCase {
|
|||
((Runnable) invocation.getArguments()[0]).run();
|
||||
return null;
|
||||
}).when(executorService).execute(any(Runnable.class));
|
||||
when(threadPool.executor(PrelertPlugin.THREAD_POOL_NAME)).thenReturn(executorService);
|
||||
when(threadPool.executor(PrelertPlugin.SCHEDULER_THREAD_POOL_NAME)).thenReturn(executorService);
|
||||
|
||||
scheduledJobService =
|
||||
new ScheduledJobService(threadPool, client, jobProvider, dataProcessor, dataExtractorFactory, () -> currentTime);
|
||||
|
@ -108,7 +108,7 @@ public class ScheduledJobServiceTests extends ESTestCase {
|
|||
scheduledJobService.start(builder.build(), allocation);
|
||||
|
||||
verify(dataExtractor).newSearch(eq(0L), eq(60000L), any());
|
||||
verify(threadPool, times(1)).executor(PrelertPlugin.THREAD_POOL_NAME);
|
||||
verify(threadPool, times(1)).executor(PrelertPlugin.SCHEDULER_THREAD_POOL_NAME);
|
||||
verify(threadPool, never()).schedule(any(), any(), any());
|
||||
verify(client).execute(same(INSTANCE), eq(new Request("foo", JobSchedulerStatus.STARTED)), any());
|
||||
verify(client).execute(same(INSTANCE), eq(new Request("foo", JobSchedulerStatus.STOPPING)), any());
|
||||
|
@ -130,8 +130,8 @@ public class ScheduledJobServiceTests extends ESTestCase {
|
|||
scheduledJobService.start(builder.build(), allocation);
|
||||
|
||||
verify(dataExtractor).newSearch(eq(0L), eq(60000L), any());
|
||||
verify(threadPool, times(1)).executor(PrelertPlugin.THREAD_POOL_NAME);
|
||||
verify(threadPool, times(1)).schedule(eq(new TimeValue(480100)), eq(PrelertPlugin.THREAD_POOL_NAME), any());
|
||||
verify(threadPool, times(1)).executor(PrelertPlugin.SCHEDULER_THREAD_POOL_NAME);
|
||||
verify(threadPool, times(1)).schedule(eq(new TimeValue(480100)), eq(PrelertPlugin.SCHEDULER_THREAD_POOL_NAME), any());
|
||||
|
||||
allocation = new Allocation(allocation.getNodeId(), allocation.getJobId(), allocation.getStatus(),
|
||||
new SchedulerState(JobSchedulerStatus.STOPPING, 0L, 60000L));
|
||||
|
|
Loading…
Reference in New Issue