Fixed AOBE caused by fetching model state when opening a job.

This error only occurred for jobs that have been opened before and persisted model state.

Closes elastic/elasticsearch#836

Original commit: elastic/x-pack-elasticsearch@ad76f4167f
This commit is contained in:
Martijn van Groningen 2017-01-31 19:52:58 +01:00
parent 7ff3b707a8
commit b07e9bbd07
3 changed files with 41 additions and 34 deletions

View File

@ -49,18 +49,15 @@ import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.xpack.ml.action.DeleteJobAction;
import org.elasticsearch.xpack.ml.action.util.QueryPage;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.MlFilter;
import org.elasticsearch.xpack.ml.job.persistence.BucketsQueryBuilder.BucketsQuery;
import org.elasticsearch.xpack.ml.job.persistence.InfluencersQueryBuilder.InfluencersQuery;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.CategorizerState;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelState;
import org.elasticsearch.xpack.ml.notifications.AuditActivity;
import org.elasticsearch.xpack.ml.notifications.AuditMessage;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.ml.job.persistence.BucketsQueryBuilder.BucketsQuery;
import org.elasticsearch.xpack.ml.job.persistence.InfluencersQueryBuilder.InfluencersQuery;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles;
import org.elasticsearch.xpack.ml.job.results.AnomalyRecord;
import org.elasticsearch.xpack.ml.job.results.Bucket;
@ -69,6 +66,9 @@ import org.elasticsearch.xpack.ml.job.results.Influencer;
import org.elasticsearch.xpack.ml.job.results.ModelDebugOutput;
import org.elasticsearch.xpack.ml.job.results.PerPartitionMaxProbabilities;
import org.elasticsearch.xpack.ml.job.results.Result;
import org.elasticsearch.xpack.ml.notifications.AuditActivity;
import org.elasticsearch.xpack.ml.notifications.AuditMessage;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import java.io.IOException;
@ -338,9 +338,9 @@ public class JobProvider {
}));
}
private <T, U> void mget(String indexName, String type, String[] ids, Consumer<Set<T>> handler, Consumer<Exception> errorHandler,
private <T, U> void mget(String indexName, String type, Set<String> ids, Consumer<Set<T>> handler, Consumer<Exception> errorHandler,
BiFunction<XContentParser, U, T> objectParser) {
if (ids.length == 0) {
if (ids.isEmpty()) {
handler.accept(Collections.emptySet());
return;
}
@ -1079,7 +1079,7 @@ public class JobProvider {
*
* @param ids the id of the requested filter
*/
public void getFilters(Consumer<Set<MlFilter>> handler, Consumer<Exception> errorHandler, String... ids) {
public void getFilters(Consumer<Set<MlFilter>> handler, Consumer<Exception> errorHandler, Set<String> ids) {
mget(ML_META_INDEX, MlFilter.TYPE.getPreferredName(), ids, handler, errorHandler, MlFilter.PARSER);
}

View File

@ -34,11 +34,11 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsPa
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles;
import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerFactory;
import org.elasticsearch.xpack.ml.job.process.normalizer.Renormalizer;
import org.elasticsearch.xpack.ml.job.process.normalizer.ScoresUpdater;
import org.elasticsearch.xpack.ml.job.process.normalizer.ShortCircuitingRenormalizer;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import java.io.IOException;
@ -201,9 +201,9 @@ public class AutodetectProcessManager extends AbstractComponent {
void gatherRequiredInformation(String jobId, TriConsumer handler, Consumer<Exception> errorHandler) {
Job job = jobManager.getJobOrThrowIfUnknown(jobId);
jobProvider.modelSnapshots(jobId, 0, 1, page -> {
ModelSnapshot modelSnapshot = page.results().isEmpty() ? null : page.results().get(1);
ModelSnapshot modelSnapshot = page.results().isEmpty() ? null : page.results().get(0);
jobProvider.getQuantiles(jobId, quantiles -> {
String[] ids = job.getAnalysisConfig().extractReferencedFilters().toArray(new String[0]);
Set<String> ids = job.getAnalysisConfig().extractReferencedFilters();
jobProvider.getFilters(filterDocument -> handler.accept(modelSnapshot, quantiles, filterDocument), errorHandler, ids);
}, errorHandler);
}, errorHandler);

View File

@ -14,6 +14,7 @@ import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.MlPlugin;
import org.elasticsearch.xpack.ml.action.UpdateJobStatusAction;
import org.elasticsearch.xpack.ml.action.util.QueryPage;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.config.DataDescription;
@ -31,8 +32,8 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsPa
import org.elasticsearch.xpack.ml.job.process.autodetect.params.TimeRange;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerFactory;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.Quantiles;
import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerFactory;
import org.elasticsearch.xpack.ml.job.results.AutodetectResult;
import org.junit.Before;
import org.mockito.Mockito;
@ -60,6 +61,7 @@ import static org.elasticsearch.mock.orig.Mockito.when;
import static org.hamcrest.core.IsEqual.equalTo;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
@ -78,6 +80,10 @@ public class AutodetectProcessManagerTests extends ESTestCase {
private JobDataCountsPersister jobDataCountsPersister;
private NormalizerFactory normalizerFactory;
private ModelSnapshot modelSnapshot = new ModelSnapshot("foo");
private Quantiles quantiles = new Quantiles("foo", new Date(), "state");
private Set<MlFilter> filters = new HashSet<>();
@Before
public void initMocks() {
jobManager = mock(JobManager.class);
@ -86,6 +92,26 @@ public class AutodetectProcessManagerTests extends ESTestCase {
jobDataCountsPersister = mock(JobDataCountsPersister.class);
normalizerFactory = mock(NormalizerFactory.class);
givenAllocationWithStatus(JobStatus.OPENED);
when(jobManager.getJobOrThrowIfUnknown("foo")).thenReturn(createJobDetails("foo"));
doAnswer(invocationOnMock -> {
@SuppressWarnings("unchecked")
Consumer<QueryPage<ModelSnapshot>> handler = (Consumer<QueryPage<ModelSnapshot>>) invocationOnMock.getArguments()[3];
handler.accept(new QueryPage<>(Collections.singletonList(modelSnapshot), 1, ModelSnapshot.RESULTS_FIELD));
return null;
}).when(jobProvider).modelSnapshots(any(), anyInt(), anyInt(), any(), any());
doAnswer(invocationOnMock -> {
@SuppressWarnings("unchecked")
Consumer<Quantiles> handler = (Consumer<Quantiles>) invocationOnMock.getArguments()[1];
handler.accept(quantiles);
return null;
}).when(jobProvider).getQuantiles(any(), any(), any());
doAnswer(invocationOnMock -> {
@SuppressWarnings("unchecked")
Consumer<Set<MlFilter>> handler = (Consumer<Set<MlFilter>>) invocationOnMock.getArguments()[0];
handler.accept(filters);
return null;
}).when(jobProvider).getFilters(any(), any(), any());
}
public void testOpenJob() {
@ -191,7 +217,6 @@ public class AutodetectProcessManagerTests extends ESTestCase {
public void testCloseJob() {
AutodetectCommunicator communicator = mock(AutodetectCommunicator.class);
when(jobManager.getJobOrThrowIfUnknown("foo")).thenReturn(createJobDetails("foo"));
AutodetectProcessManager manager = createManager(communicator);
assertEquals(0, manager.numberOfOpenJobs());
@ -218,7 +243,6 @@ public class AutodetectProcessManagerTests extends ESTestCase {
public void testFlush() throws IOException {
AutodetectCommunicator communicator = mock(AutodetectCommunicator.class);
AutodetectProcessManager manager = createManager(communicator);
when(jobManager.getJobOrThrowIfUnknown("foo")).thenReturn(createJobDetails("foo"));
InputStream inputStream = createInputStream("");
manager.openJob("foo", false, e -> {});
@ -267,7 +291,6 @@ public class AutodetectProcessManagerTests extends ESTestCase {
Job job = createJobDetails("foo");
when(jobManager.getJobOrThrowIfUnknown("foo")).thenReturn(job);
givenAllocationWithStatus(JobStatus.OPENED);
InputStream inputStream = createInputStream("");
@ -295,16 +318,8 @@ public class AutodetectProcessManagerTests extends ESTestCase {
AutodetectResultsParser parser = mock(AutodetectResultsParser.class);
AutodetectProcess autodetectProcess = mock(AutodetectProcess.class);
AutodetectProcessFactory autodetectProcessFactory = (j, modelSnapshot, quantiles, filters, i, e) -> autodetectProcess;
AutodetectProcessManager manager = spy(new AutodetectProcessManager(Settings.EMPTY, client, threadPool, jobManager, jobProvider,
jobResultsPersister, jobDataCountsPersister, parser, autodetectProcessFactory, normalizerFactory));
ModelSnapshot modelSnapshot = new ModelSnapshot("foo");
Quantiles quantiles = new Quantiles("foo", new Date(), "state");
Set<MlFilter> filters = new HashSet<>();
doAnswer(invocationOnMock -> {
AutodetectProcessManager.TriConsumer consumer = (AutodetectProcessManager.TriConsumer) invocationOnMock.getArguments()[1];
consumer.accept(modelSnapshot, quantiles, filters);
return null;
}).when(manager).gatherRequiredInformation(any(), any(), any());
AutodetectProcessManager manager = new AutodetectProcessManager(Settings.EMPTY, client, threadPool, jobManager, jobProvider,
jobResultsPersister, jobDataCountsPersister, parser, autodetectProcessFactory, normalizerFactory);
expectThrows(EsRejectedExecutionException.class, () -> manager.create("my_id", modelSnapshot, quantiles, filters, false, e -> {}));
verify(autodetectProcess, times(1)).close();
@ -328,14 +343,6 @@ public class AutodetectProcessManagerTests extends ESTestCase {
AutodetectProcessManager manager = new AutodetectProcessManager(Settings.EMPTY, client, threadPool, jobManager, jobProvider,
jobResultsPersister, jobDataCountsPersister, parser, autodetectProcessFactory, normalizerFactory);
manager = spy(manager);
ModelSnapshot modelSnapshot = new ModelSnapshot("foo");
Quantiles quantiles = new Quantiles("foo", new Date(), "state");
Set<MlFilter> filters = new HashSet<>();
doAnswer(invocationOnMock -> {
AutodetectProcessManager.TriConsumer consumer = (AutodetectProcessManager.TriConsumer) invocationOnMock.getArguments()[1];
consumer.accept(modelSnapshot, quantiles, filters);
return null;
}).when(manager).gatherRequiredInformation(any(), any(), any());
doReturn(communicator).when(manager).create(any(), eq(modelSnapshot), eq(quantiles), eq(filters), anyBoolean(), any());
return manager;
}