[ML] Use allocation id as key in `AutodetectProcessManager#autoDetectCommunicatorByJob` map instead of job id.

Relates to elastic/x-pack-elasticsearch#921

Original commit: elastic/x-pack-elasticsearch@21383fd51c
This commit is contained in:
Martijn van Groningen 2017-04-26 19:33:30 +02:00
parent 1f57a53b3e
commit 295a4049a3
9 changed files with 134 additions and 93 deletions

View File

@ -268,7 +268,7 @@ public class FlushJobAction extends Action<FlushJobAction.Request, FlushJobActio
timeRangeBuilder.endTime(request.getEnd());
}
paramsBuilder.forTimeRange(timeRangeBuilder.build());
processManager.flushJob(request.getJobId(), paramsBuilder.build(), e -> {
processManager.flushJob(task, paramsBuilder.build(), e -> {
if (e == null) {
listener.onResponse(new Response(true));
} else {

View File

@ -413,13 +413,13 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Request, GetJo
logger.debug("Get stats for job [{}]", jobId);
ClusterState state = clusterService.state();
PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
Optional<Tuple<DataCounts, ModelSizeStats>> stats = processManager.getStatistics(jobId);
Optional<Tuple<DataCounts, ModelSizeStats>> stats = processManager.getStatistics(task);
if (stats.isPresent()) {
PersistentTask<?> pTask = MlMetadata.getJobTask(jobId, tasks);
DiscoveryNode node = state.nodes().get(pTask.getExecutorNode());
JobState jobState = MlMetadata.getJobState(jobId, tasks);
String assignmentExplanation = pTask.getAssignment().getExplanation();
TimeValue openTime = durationToTimeValue(processManager.jobOpenTime(jobId));
TimeValue openTime = durationToTimeValue(processManager.jobOpenTime(task));
Response.JobStats jobStats = new Response.JobStats(jobId, stats.get().v1(), stats.get().v2(), jobState,
node, assignmentExplanation, openTime);
listener.onResponse(new QueryPage<>(Collections.singletonList(jobStats), 1, Job.RESULTS_FIELD));

View File

@ -341,7 +341,7 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
}
void closeJob(String reason) {
autodetectProcessManager.closeJob(jobId, false, reason);
autodetectProcessManager.closeJob(this, false, reason);
}
static boolean match(Task task, String expectedJobId) {
@ -515,7 +515,7 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
protected void nodeOperation(AllocatedPersistentTask task, JobParams params) {
JobTask jobTask = (JobTask) task;
jobTask.autodetectProcessManager = autodetectProcessManager;
autodetectProcessManager.openJob(params.getJobId(), jobTask, params.isIgnoreDowntime(), e2 -> {
autodetectProcessManager.openJob(jobTask, params.isIgnoreDowntime(), e2 -> {
if (e2 == null) {
task.markAsCompleted();
} else {

View File

@ -255,8 +255,7 @@ public class PostDataAction extends Action<PostDataAction.Request, PostDataActio
TimeRange timeRange = TimeRange.builder().startTime(request.getResetStart()).endTime(request.getResetEnd()).build();
DataLoadParams params = new DataLoadParams(timeRange, Optional.ofNullable(request.getDataDescription()));
try {
processManager.processData(request.getJobId(),
request.content.streamInput(), request.getXContentType(), params, (dataCounts, e) -> {
processManager.processData(task, request.content.streamInput(), request.getXContentType(), params, (dataCounts, e) -> {
if (dataCounts != null) {
listener.onResponse(new Response(dataCounts));
} else {

View File

@ -199,7 +199,7 @@ public class UpdateProcessAction extends
@Override
protected void taskOperation(Request request, OpenJobAction.JobTask task, ActionListener<Response> listener) {
try {
processManager.writeUpdateProcessMessage(request.getJobId(), request.getDetectorUpdates(),
processManager.writeUpdateProcessMessage(task, request.getDetectorUpdates(),
request.getModelPlotConfig(), e -> {
if (e == null) {
listener.onResponse(new Response());

View File

@ -14,6 +14,7 @@ import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.xpack.ml.action.OpenJobAction.JobTask;
import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobUpdate;
@ -34,7 +35,6 @@ import java.io.InputStream;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
@ -51,6 +51,7 @@ public class AutodetectCommunicator implements Closeable {
private static final Duration FLUSH_PROCESS_CHECK_FREQUENCY = Duration.ofSeconds(1);
private final Job job;
private final JobTask jobTask;
private final DataCountsReporter dataCountsReporter;
private final AutodetectProcess autodetectProcess;
private final AutoDetectResultProcessor autoDetectResultProcessor;
@ -58,11 +59,11 @@ public class AutodetectCommunicator implements Closeable {
private final ExecutorService autodetectWorkerExecutor;
private final NamedXContentRegistry xContentRegistry;
AutodetectCommunicator(Job job, AutodetectProcess process,
DataCountsReporter dataCountsReporter,
AutoDetectResultProcessor autoDetectResultProcessor, Consumer<Exception> handler,
NamedXContentRegistry xContentRegistry, ExecutorService autodetectWorkerExecutor) {
AutodetectCommunicator(Job job, JobTask jobTask, AutodetectProcess process, DataCountsReporter dataCountsReporter,
AutoDetectResultProcessor autoDetectResultProcessor, Consumer<Exception> handler,
NamedXContentRegistry xContentRegistry, ExecutorService autodetectWorkerExecutor) {
this.job = job;
this.jobTask = jobTask;
this.autodetectProcess = process;
this.dataCountsReporter = dataCountsReporter;
this.autoDetectResultProcessor = autoDetectResultProcessor;
@ -202,6 +203,10 @@ public class AutodetectCommunicator implements Closeable {
}
}
public JobTask getJobTask() {
return jobTask;
}
public ZonedDateTime getProcessStartTime() {
return autodetectProcess.getProcessStartTime();
}

View File

@ -94,7 +94,7 @@ public class AutodetectProcessManager extends AbstractComponent {
private final JobResultsPersister jobResultsPersister;
private final JobDataCountsPersister jobDataCountsPersister;
private final ConcurrentMap<String, AutodetectCommunicator> autoDetectCommunicatorByJob;
private final ConcurrentMap<Long, AutodetectCommunicator> autoDetectCommunicatorByJob;
private final int maxAllowedRunningJobs;
@ -128,8 +128,8 @@ public class AutodetectProcessManager extends AbstractComponent {
logger.info("Closing [{}] jobs, because [{}]", numJobs, reason);
}
for (Map.Entry<String, AutodetectCommunicator> entry : autoDetectCommunicatorByJob.entrySet()) {
closeJob(entry.getKey(), false, reason);
for (Map.Entry<Long, AutodetectCommunicator> entry : autoDetectCommunicatorByJob.entrySet()) {
closeJob(entry.getValue().getJobTask(), false, reason);
}
}
@ -146,17 +146,17 @@ public class AutodetectProcessManager extends AbstractComponent {
* <li>If a high proportion of the records chronologically out of order</li>
* </ol>
*
* @param jobId the jobId
* @param input Data input stream
* @param jobTask The job task
* @param input Data input stream
* @param xContentType the {@link XContentType} of the input
* @param params Data processing parameters
* @param handler Delegate error or datacount results (Count of records, fields, bytes, etc written)
* @param params Data processing parameters
* @param handler Delegate error or datacount results (Count of records, fields, bytes, etc written)
*/
public void processData(String jobId, InputStream input, XContentType xContentType,
public void processData(JobTask jobTask, InputStream input, XContentType xContentType,
DataLoadParams params, BiConsumer<DataCounts, Exception> handler) {
AutodetectCommunicator communicator = autoDetectCommunicatorByJob.get(jobId);
AutodetectCommunicator communicator = autoDetectCommunicatorByJob.get(jobTask.getAllocationId());
if (communicator == null) {
throw ExceptionsHelper.conflictStatusException("Cannot process data because job [" + jobId + "] is not open");
throw ExceptionsHelper.conflictStatusException("Cannot process data because job [" + jobTask.getJobId() + "] is not open");
}
communicator.writeToJob(input, xContentType, params, handler);
}
@ -166,15 +166,15 @@ public class AutodetectProcessManager extends AbstractComponent {
* opportunity to process all data previously sent to it with none left
* sitting in buffers.
*
* @param jobId The job to flush
* @param params Parameters about whether interim results calculation
* should occur and for which period of time
* @param jobTask The job task
* @param params Parameters about whether interim results calculation
* should occur and for which period of time
*/
public void flushJob(String jobId, InterimResultsParams params, Consumer<Exception> handler) {
logger.debug("Flushing job {}", jobId);
AutodetectCommunicator communicator = autoDetectCommunicatorByJob.get(jobId);
public void flushJob(JobTask jobTask, InterimResultsParams params, Consumer<Exception> handler) {
logger.debug("Flushing job {}", jobTask.getJobId());
AutodetectCommunicator communicator = autoDetectCommunicatorByJob.get(jobTask.getAllocationId());
if (communicator == null) {
String message = String.format(Locale.ROOT, "Cannot flush because job [%s] is not open", jobId);
String message = String.format(Locale.ROOT, "Cannot flush because job [%s] is not open", jobTask.getJobId());
logger.debug(message);
throw ExceptionsHelper.conflictStatusException(message);
}
@ -183,18 +183,18 @@ public class AutodetectProcessManager extends AbstractComponent {
if (e == null) {
handler.accept(null);
} else {
String msg = String.format(Locale.ROOT, "[%s] exception while flushing job", jobId);
String msg = String.format(Locale.ROOT, "[%s] exception while flushing job", jobTask.getJobId());
logger.error(msg);
handler.accept(ExceptionsHelper.serverError(msg, e));
}
});
}
public void writeUpdateProcessMessage(String jobId, List<JobUpdate.DetectorUpdate> updates, ModelPlotConfig config,
public void writeUpdateProcessMessage(JobTask jobTask, List<JobUpdate.DetectorUpdate> updates, ModelPlotConfig config,
Consumer<Exception> handler) {
AutodetectCommunicator communicator = autoDetectCommunicatorByJob.get(jobId);
AutodetectCommunicator communicator = autoDetectCommunicatorByJob.get(jobTask.getAllocationId());
if (communicator == null) {
String message = "Cannot process update model debug config because job [" + jobId + "] is not open";
String message = "Cannot process update model debug config because job [" + jobTask.getJobId() + "] is not open";
logger.debug(message);
handler.accept(ExceptionsHelper.conflictStatusException(message));
return;
@ -208,7 +208,8 @@ public class AutodetectProcessManager extends AbstractComponent {
});
}
public void openJob(String jobId, JobTask jobTask, boolean ignoreDowntime, Consumer<Exception> handler) {
public void openJob(JobTask jobTask, boolean ignoreDowntime, Consumer<Exception> handler) {
String jobId = jobTask.getJobId();
logger.info("Opening job [{}]", jobId);
Job job = jobManager.getJobOrThrowIfUnknown(jobId);
jobProvider.getAutodetectParams(job, params -> {
@ -222,8 +223,8 @@ public class AutodetectProcessManager extends AbstractComponent {
@Override
protected void doRun() throws Exception {
try {
AutodetectCommunicator communicator = autoDetectCommunicatorByJob.computeIfAbsent(jobId, id ->
create(id, jobTask, params, ignoreDowntime, handler));
AutodetectCommunicator communicator = autoDetectCommunicatorByJob.computeIfAbsent(jobTask.getAllocationId(),
id -> create(jobTask, params, ignoreDowntime, handler));
communicator.writeJobInputHeader();
setJobState(jobTask, JobState.OPENED);
} catch (Exception e1) {
@ -243,13 +244,14 @@ public class AutodetectProcessManager extends AbstractComponent {
});
}
AutodetectCommunicator create(String jobId, JobTask jobTask, AutodetectParams autodetectParams,
AutodetectCommunicator create(JobTask jobTask, AutodetectParams autodetectParams,
boolean ignoreDowntime, Consumer<Exception> handler) {
if (autoDetectCommunicatorByJob.size() == maxAllowedRunningJobs) {
throw new ElasticsearchStatusException("max running job capacity [" + maxAllowedRunningJobs + "] reached",
RestStatus.TOO_MANY_REQUESTS);
}
String jobId = jobTask.getJobId();
notifyLoadingSnapshot(jobId, autodetectParams);
if (autodetectParams.dataCounts().getProcessedRecordCount() > 0) {
@ -296,8 +298,8 @@ public class AutodetectProcessManager extends AbstractComponent {
}
throw e;
}
return new AutodetectCommunicator(job, process, dataCountsReporter, processor,
handler, xContentRegistry, autodetectWorkerExecutor);
return new AutodetectCommunicator(job, jobTask, process, dataCountsReporter, processor, handler, xContentRegistry,
autodetectWorkerExecutor);
}
@ -327,22 +329,22 @@ public class AutodetectProcessManager extends AbstractComponent {
/**
* Stop the running job and mark it as finished.<br>
*
* @param jobId The job to stop
* @param jobTask The job to stop
* @param restart Whether the job should be restarted by persistent tasks
* @param reason The reason for closing the job
*/
public void closeJob(String jobId, boolean restart, String reason) {
logger.debug("Attempting to close job [{}], because [{}]", jobId, reason);
AutodetectCommunicator communicator = autoDetectCommunicatorByJob.remove(jobId);
public void closeJob(JobTask jobTask, boolean restart, String reason) {
logger.debug("Attempting to close job [{}], because [{}]", jobTask, reason);
AutodetectCommunicator communicator = autoDetectCommunicatorByJob.remove(jobTask.getAllocationId());
if (communicator == null) {
logger.debug("Cannot close: no active autodetect process for job {}", jobId);
logger.debug("Cannot close: no active autodetect process for job {}", jobTask);
return;
}
if (reason == null) {
logger.info("Closing job [{}]", jobId);
logger.info("Closing job [{}]", jobTask);
} else {
logger.info("Closing job [{}], because [{}]", jobId, reason);
logger.info("Closing job [{}], because [{}]", jobTask, reason);
}
try {
@ -357,12 +359,12 @@ public class AutodetectProcessManager extends AbstractComponent {
return autoDetectCommunicatorByJob.size();
}
boolean jobHasActiveAutodetectProcess(String jobId) {
return autoDetectCommunicatorByJob.get(jobId) != null;
boolean jobHasActiveAutodetectProcess(JobTask jobTask) {
return autoDetectCommunicatorByJob.get(jobTask.getAllocationId()) != null;
}
public Optional<Duration> jobOpenTime(String jobId) {
AutodetectCommunicator communicator = autoDetectCommunicatorByJob.get(jobId);
public Optional<Duration> jobOpenTime(JobTask jobTask) {
AutodetectCommunicator communicator = autoDetectCommunicatorByJob.get(jobTask.getAllocationId());
if (communicator == null) {
return Optional.empty();
}
@ -407,8 +409,8 @@ public class AutodetectProcessManager extends AbstractComponent {
});
}
public Optional<Tuple<DataCounts, ModelSizeStats>> getStatistics(String jobId) {
AutodetectCommunicator communicator = autoDetectCommunicatorByJob.get(jobId);
public Optional<Tuple<DataCounts, ModelSizeStats>> getStatistics(JobTask jobTask) {
AutodetectCommunicator communicator = autoDetectCommunicatorByJob.get(jobTask.getAllocationId());
if (communicator == null) {
return Optional.empty();
}

View File

@ -10,6 +10,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.action.OpenJobAction.JobTask;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.config.Detector;
@ -138,7 +139,9 @@ public class AutodetectCommunicatorTests extends ESTestCase {
((ActionListener<Boolean>) invocation.getArguments()[0]).onResponse(true);
return null;
}).when(dataCountsReporter).finishReporting(any());
return new AutodetectCommunicator(createJobDetails(), autodetectProcess,
JobTask jobTask = mock(JobTask.class);
when(jobTask.getJobId()).thenReturn("foo");
return new AutodetectCommunicator(createJobDetails(), jobTask, autodetectProcess,
dataCountsReporter, autoDetectResultProcessor, e -> {
}, new NamedXContentRegistry(Collections.emptyList()), executorService);
}

View File

@ -76,8 +76,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
/**
* Calling the
* {@link AutodetectProcessManager#processData(String, InputStream, XContentType, DataLoadParams, BiConsumer)}
* Calling the * {@link AutodetectProcessManager#processData(JobTask, InputStream, XContentType, DataLoadParams, BiConsumer)}
* method causes an AutodetectCommunicator to be created on demand. Most of
* these tests have to do that before they can assert other things
*/
@ -122,10 +121,11 @@ public class AutodetectProcessManagerTests extends ESTestCase {
AutodetectProcessManager manager = createManager(communicator, client);
JobTask jobTask = mock(JobTask.class);
when(jobTask.getJobId()).thenReturn("foo");
when(jobTask.getAllocationId()).thenReturn(1L);
manager.openJob("foo", jobTask, false, e -> {});
manager.openJob(jobTask, false, e -> {});
assertEquals(1, manager.numberOfOpenJobs());
assertTrue(manager.jobHasActiveAutodetectProcess("foo"));
assertTrue(manager.jobHasActiveAutodetectProcess(jobTask));
verify(jobTask).updatePersistentStatus(eq(new JobTaskStatus(JobState.OPENED, 1L)), any());
}
@ -164,19 +164,32 @@ public class AutodetectProcessManagerTests extends ESTestCase {
}).when(manager).setJobState(any(), eq(JobState.FAILED), any());
JobTask jobTask = mock(JobTask.class);
manager.openJob("foo", jobTask, false, e -> {});
manager.openJob("bar", jobTask, false, e -> {});
manager.openJob("baz", jobTask, false, e -> {});
when(jobTask.getJobId()).thenReturn("foo");
manager.openJob(jobTask, false, e -> {});
jobTask = mock(JobTask.class);
when(jobTask.getJobId()).thenReturn("bar");
when(jobTask.getAllocationId()).thenReturn(1L);
manager.openJob(jobTask, false, e -> {});
jobTask = mock(JobTask.class);
when(jobTask.getJobId()).thenReturn("baz");
when(jobTask.getAllocationId()).thenReturn(2L);
manager.openJob(jobTask, false, e -> {});
assertEquals(3, manager.numberOfOpenJobs());
Exception[] holder = new Exception[1];
manager.openJob("foobar", jobTask, false, e -> holder[0] = e);
jobTask = mock(JobTask.class);
when(jobTask.getJobId()).thenReturn("foobar");
when(jobTask.getAllocationId()).thenReturn(3L);
manager.openJob(jobTask, false, e -> holder[0] = e);
Exception e = holder[0];
assertEquals("max running job capacity [3] reached", e.getMessage());
manager.closeJob("baz", false, null);
jobTask = mock(JobTask.class);
when(jobTask.getAllocationId()).thenReturn(2L);
when(jobTask.getJobId()).thenReturn("baz");
manager.closeJob(jobTask, false, null);
assertEquals(2, manager.numberOfOpenJobs());
manager.openJob("foobar", jobTask, false, e1 -> {});
manager.openJob(jobTask, false, e1 -> {});
assertEquals(3, manager.numberOfOpenJobs());
}
@ -186,9 +199,10 @@ public class AutodetectProcessManagerTests extends ESTestCase {
assertEquals(0, manager.numberOfOpenJobs());
JobTask jobTask = mock(JobTask.class);
when(jobTask.getJobId()).thenReturn("foo");
DataLoadParams params = new DataLoadParams(TimeRange.builder().build(), Optional.empty());
manager.openJob("foo", jobTask, false, e -> {});
manager.processData("foo", createInputStream(""), randomFrom(XContentType.values()),
manager.openJob(jobTask, false, e -> {});
manager.processData(jobTask, createInputStream(""), randomFrom(XContentType.values()),
params, (dataCounts1, e) -> {});
assertEquals(1, manager.numberOfOpenJobs());
}
@ -208,9 +222,10 @@ public class AutodetectProcessManagerTests extends ESTestCase {
JobTask jobTask = mock(JobTask.class);
manager.openJob("foo", jobTask, false, e -> {});
when(jobTask.getJobId()).thenReturn("foo");
manager.openJob(jobTask, false, e -> {});
Exception[] holder = new Exception[1];
manager.processData("foo", inputStream, xContentType, params, (dataCounts1, e) -> holder[0] = e);
manager.processData(jobTask, inputStream, xContentType, params, (dataCounts1, e) -> holder[0] = e);
assertNotNull(holder[0]);
}
@ -220,13 +235,14 @@ public class AutodetectProcessManagerTests extends ESTestCase {
assertEquals(0, manager.numberOfOpenJobs());
JobTask jobTask = mock(JobTask.class);
manager.openJob("foo", jobTask, false, e -> {});
manager.processData("foo", createInputStream(""), randomFrom(XContentType.values()),
when(jobTask.getJobId()).thenReturn("foo");
manager.openJob(jobTask, false, e -> {});
manager.processData(jobTask, createInputStream(""), randomFrom(XContentType.values()),
mock(DataLoadParams.class), (dataCounts1, e) -> {});
// job is created
assertEquals(1, manager.numberOfOpenJobs());
manager.closeJob("foo", false, null);
manager.closeJob(jobTask, false, null);
assertEquals(0, manager.numberOfOpenJobs());
}
@ -238,8 +254,9 @@ public class AutodetectProcessManagerTests extends ESTestCase {
DataLoadParams params = new DataLoadParams(TimeRange.builder().startTime("1000").endTime("2000").build(), Optional.empty());
InputStream inputStream = createInputStream("");
JobTask jobTask = mock(JobTask.class);
manager.openJob("foo", jobTask, false, e -> {});
manager.processData("foo", inputStream, xContentType, params, (dataCounts1, e) -> {});
when(jobTask.getJobId()).thenReturn("foo");
manager.openJob(jobTask, false, e -> {});
manager.processData(jobTask, inputStream, xContentType, params, (dataCounts1, e) -> {});
verify(communicator).writeToJob(same(inputStream), same(xContentType), same(params), any());
}
@ -248,13 +265,14 @@ public class AutodetectProcessManagerTests extends ESTestCase {
AutodetectProcessManager manager = createManager(communicator);
JobTask jobTask = mock(JobTask.class);
when(jobTask.getJobId()).thenReturn("foo");
InputStream inputStream = createInputStream("");
manager.openJob("foo", jobTask, false, e -> {});
manager.processData("foo", inputStream, randomFrom(XContentType.values()),
manager.openJob(jobTask, false, e -> {});
manager.processData(jobTask, inputStream, randomFrom(XContentType.values()),
mock(DataLoadParams.class), (dataCounts1, e) -> {});
InterimResultsParams params = InterimResultsParams.builder().build();
manager.flushJob("foo", params, e -> {});
manager.flushJob(jobTask, params, e -> {});
verify(communicator).flushJob(same(params), any());
}
@ -270,8 +288,10 @@ public class AutodetectProcessManagerTests extends ESTestCase {
return null;
}).when(communicator).flushJob(same(params), any());
JobTask jobTask = mock(JobTask.class);
when(jobTask.getJobId()).thenReturn("foo");
Exception[] holder = new Exception[1];
manager.flushJob("foo", params, e -> holder[0] = e);
manager.flushJob(jobTask, params, e -> holder[0] = e);
assertEquals("[foo] exception while flushing job", holder[0].getMessage());
}
@ -281,22 +301,28 @@ public class AutodetectProcessManagerTests extends ESTestCase {
ModelPlotConfig modelConfig = mock(ModelPlotConfig.class);
List<DetectionRule> rules = Collections.singletonList(mock(DetectionRule.class));
List<JobUpdate.DetectorUpdate> detectorUpdates = Collections.singletonList(new JobUpdate.DetectorUpdate(2, null, rules));
manager.writeUpdateProcessMessage("foo", detectorUpdates, modelConfig, e -> {});
JobTask jobTask = mock(JobTask.class);
when(jobTask.getJobId()).thenReturn("foo");
manager.writeUpdateProcessMessage(jobTask, detectorUpdates, modelConfig, e -> {});
verify(communicator).writeUpdateProcessMessage(same(modelConfig), same(detectorUpdates), any());
}
public void testJobHasActiveAutodetectProcess() throws IOException {
AutodetectCommunicator communicator = mock(AutodetectCommunicator.class);
AutodetectProcessManager manager = createManager(communicator);
assertFalse(manager.jobHasActiveAutodetectProcess("foo"));
JobTask jobTask = mock(JobTask.class);
manager.openJob("foo", jobTask, false, e -> {});
manager.processData("foo", createInputStream(""), randomFrom(XContentType.values()),
when(jobTask.getJobId()).thenReturn("foo");
assertFalse(manager.jobHasActiveAutodetectProcess(jobTask));
manager.openJob(jobTask, false, e -> {});
manager.processData(jobTask, createInputStream(""), randomFrom(XContentType.values()),
mock(DataLoadParams.class), (dataCounts1, e) -> {});
assertTrue(manager.jobHasActiveAutodetectProcess("foo"));
assertFalse(manager.jobHasActiveAutodetectProcess("bar"));
assertTrue(manager.jobHasActiveAutodetectProcess(jobTask));
jobTask = mock(JobTask.class);
when(jobTask.getJobId()).thenReturn("bar");
when(jobTask.getAllocationId()).thenReturn(1L);
assertFalse(manager.jobHasActiveAutodetectProcess(jobTask));
}
public void testProcessData_GivenStateNotOpened() throws IOException {
@ -310,10 +336,11 @@ public class AutodetectProcessManagerTests extends ESTestCase {
AutodetectProcessManager manager = createManager(communicator);
JobTask jobTask = mock(JobTask.class);
manager.openJob("foo", jobTask, false, e -> {});
when(jobTask.getJobId()).thenReturn("foo");
manager.openJob(jobTask, false, e -> {});
InputStream inputStream = createInputStream("");
DataCounts[] dataCounts = new DataCounts[1];
manager.processData("foo", inputStream,
manager.processData(jobTask, inputStream,
randomFrom(XContentType.values()), mock(DataLoadParams.class), (dataCounts1, e) -> {
dataCounts[0] = dataCounts1;
});
@ -337,8 +364,9 @@ public class AutodetectProcessManagerTests extends ESTestCase {
normalizerFactory, new NamedXContentRegistry(Collections.emptyList()), auditor);
JobTask jobTask = mock(JobTask.class);
when(jobTask.getJobId()).thenReturn("my_id");
expectThrows(EsRejectedExecutionException.class,
() -> manager.create("my_id", jobTask, buildAutodetectParams(), false, e -> {}));
() -> manager.create(jobTask, buildAutodetectParams(), false, e -> {}));
verify(autodetectProcess, times(1)).close();
}
@ -347,7 +375,8 @@ public class AutodetectProcessManagerTests extends ESTestCase {
AutodetectProcessManager manager = createNonSpyManager("foo");
JobTask jobTask = mock(JobTask.class);
manager.create("foo", jobTask, buildAutodetectParams(), false, e -> {});
when(jobTask.getJobId()).thenReturn("foo");
manager.create(jobTask, buildAutodetectParams(), false, e -> {});
String expectedNotification = "Loading model snapshot [N/A], job latest_record_timestamp [N/A]";
verify(auditor).info("foo", expectedNotification);
@ -362,7 +391,8 @@ public class AutodetectProcessManagerTests extends ESTestCase {
AutodetectProcessManager manager = createNonSpyManager("foo");
JobTask jobTask = mock(JobTask.class);
manager.create("foo", jobTask, buildAutodetectParams(), false, e -> {});
when(jobTask.getJobId()).thenReturn("foo");
manager.create(jobTask, buildAutodetectParams(), false, e -> {});
String expectedNotification = "Loading model snapshot [snapshot-1] with " +
"latest_record_timestamp [1970-01-01T00:00:00.000Z], " +
@ -380,7 +410,8 @@ public class AutodetectProcessManagerTests extends ESTestCase {
AutodetectProcessManager manager = createNonSpyManager("foo");
JobTask jobTask = mock(JobTask.class);
manager.create("foo", jobTask, buildAutodetectParams(), false, e -> {});
when(jobTask.getJobId()).thenReturn("foo");
manager.create(jobTask, buildAutodetectParams(), false, e -> {});
String expectedNotification = "Loading model snapshot [N/A], " +
"job latest_record_timestamp [1970-01-01T00:00:00.000Z]";
@ -429,15 +460,16 @@ public class AutodetectProcessManagerTests extends ESTestCase {
autodetectProcessFactory, normalizerFactory,
new NamedXContentRegistry(Collections.emptyList()), auditor);
manager = spy(manager);
doReturn(communicator).when(manager).create(any(), any(), eq(buildAutodetectParams()), anyBoolean(), any());
doReturn(communicator).when(manager).create(any(), eq(buildAutodetectParams()), anyBoolean(), any());
return manager;
}
private AutodetectProcessManager createManagerAndCallProcessData(AutodetectCommunicator communicator, String jobId) {
AutodetectProcessManager manager = createManager(communicator);
JobTask jobTask = mock(JobTask.class);
manager.openJob(jobId, jobTask, false, e -> {});
manager.processData(jobId, createInputStream(""), randomFrom(XContentType.values()),
when(jobTask.getJobId()).thenReturn("foo");
manager.openJob(jobTask, false, e -> {});
manager.processData(jobTask, createInputStream(""), randomFrom(XContentType.values()),
mock(DataLoadParams.class), (dataCounts, e) -> {});
return manager;
}