From cb29cbd8a9dff5d6be021523293ec31d47de30a4 Mon Sep 17 00:00:00 2001 From: Zachary Tong Date: Tue, 14 Feb 2017 14:59:28 -0500 Subject: [PATCH] [ML] Allow result indices to be shared. (elastic/x-pack-elasticsearch#555) Essentially an update to https://github.com/elastic/prelert-legacy/pull/736 . Still does not default to using shared indices, but adds the capability for two jobs to share the same one without conflict Still does not default to using shared indices, just adds the capability for two jobs to share the same one without conflict. Original commit: elastic/x-pack-elasticsearch@60d93a06eaa688f988c00105fb3fd60b1be1afb7 --- .../xpack/ml/action/PutJobAction.java | 2 +- .../xpack/ml/job/JobManager.java | 11 +- .../xpack/ml/job/persistence/JobProvider.java | 46 ++++-- .../AutodetectResultProcessorIT.java | 24 ++- .../xpack/ml/integration/MlJobIT.java | 55 +++++-- .../xpack/ml/job/JobManagerTests.java | 54 ------- .../ml/job/persistence/JobProviderTests.java | 149 +++++++++++++++++- 7 files changed, 248 insertions(+), 93 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/PutJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/PutJobAction.java index f6525873012..8e7c6e4b82b 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/PutJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/PutJobAction.java @@ -202,7 +202,7 @@ public class PutJobAction extends Action listener) throws Exception { - jobManager.putJob(request, listener); + jobManager.putJob(request, state, listener); } @Override diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index 689dcfd28d2..51505927585 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -162,11 +162,11 @@ public class JobManager extends AbstractComponent { /** * Stores a job in the cluster state */ - public void putJob(PutJobAction.Request request, ActionListener actionListener) { + public void putJob(PutJobAction.Request request, ClusterState state, ActionListener actionListener) { Job job = request.getJob(); ActionListener createResultsIndexListener = ActionListener.wrap(jobSaved -> - jobProvider.createJobResultIndex(job, new ActionListener() { + jobProvider.createJobResultIndex(job, state, new ActionListener() { @Override public void onResponse(Boolean indicesCreated) { audit(job.getId()).info(Messages.getMessage(Messages.JOB_AUDIT_CREATED)); @@ -193,12 +193,7 @@ public class JobManager extends AbstractComponent { @Override public ClusterState execute(ClusterState currentState) throws Exception { - ClusterState cs = updateClusterState(job, false, currentState); - if (currentState.metaData().index(AnomalyDetectorsIndex.jobResultsIndexName(job.getIndexName())) != null) { - throw new ResourceAlreadyExistsException(Messages.getMessage(Messages.JOB_INDEX_ALREADY_EXISTS, - AnomalyDetectorsIndex.jobResultsIndexName(job.getIndexName()))); - } - return cs; + return updateClusterState(job, false, currentState); } }); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java index 8917ea9a6f6..182bfc6613b 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java @@ -9,6 +9,7 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRefIterator; import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; @@ -22,6 +23,7 @@ import org.elasticsearch.action.search.MultiSearchResponse; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.common.Strings; @@ -235,7 +237,7 @@ public class JobProvider { /** * Create the Elasticsearch index and the mappings */ - public void createJobResultIndex(Job job, ActionListener listener) { + public void createJobResultIndex(Job job, ClusterState state, ActionListener listener) { Collection termFields = (job.getAnalysisConfig() != null) ? job.getAnalysisConfig().termFields() : Collections.emptyList(); try { XContentBuilder resultsMapping = ElasticsearchMappings.resultsMapping(termFields); @@ -247,14 +249,6 @@ public class JobProvider { boolean createIndexAlias = !job.getIndexName().equals(job.getId()); String indexName = AnomalyDetectorsIndex.jobResultsIndexName(job.getIndexName()); - LOGGER.trace("ES API CALL: create index {}", indexName); - CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName); - createIndexRequest.settings(mlResultsIndexSettings()); - createIndexRequest.mapping(Result.TYPE.getPreferredName(), resultsMapping); - createIndexRequest.mapping(CategoryDefinition.TYPE.getPreferredName(), categoryDefinitionMapping); - createIndexRequest.mapping(DataCounts.TYPE.getPreferredName(), dataCountsMapping); - createIndexRequest.mapping(ModelSnapshot.TYPE.getPreferredName(), modelSnapshotMapping); - if (createIndexAlias) { final ActionListener responseListener = listener; listener = ActionListener.wrap(aBoolean -> { @@ -265,9 +259,37 @@ public class JobProvider { listener::onFailure); } - final ActionListener createdListener = listener; - client.admin().indices().create(createIndexRequest, - ActionListener.wrap(r -> createdListener.onResponse(true), createdListener::onFailure)); + // Indices can be shared, so only create if it doesn't exist already. Saves us a roundtrip if + // already in the CS + if (!state.getMetaData().hasIndex(indexName)) { + LOGGER.trace("ES API CALL: create index {}", indexName); + CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName); + createIndexRequest.settings(mlResultsIndexSettings()); + createIndexRequest.mapping(Result.TYPE.getPreferredName(), resultsMapping); + createIndexRequest.mapping(CategoryDefinition.TYPE.getPreferredName(), categoryDefinitionMapping); + createIndexRequest.mapping(DataCounts.TYPE.getPreferredName(), dataCountsMapping); + createIndexRequest.mapping(ModelSnapshot.TYPE.getPreferredName(), modelSnapshotMapping); + + final ActionListener createdListener = listener; + client.admin().indices().create(createIndexRequest, + ActionListener.wrap(r -> createdListener.onResponse(true), + e -> { + // Possible that the index was created while the request was executing, + // so we need to handle that possibility + if (e instanceof ResourceAlreadyExistsException) { + LOGGER.info("Index already exists"); + // Create the alias + createdListener.onResponse(true); + } else { + createdListener.onFailure(e); + } + } + )); + } else { + // Trigger the alias creation handler manually, since the index already exists + listener.onResponse(true); + } + } catch (Exception e) { listener.onFailure(e); } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java index 0c0156e0fa0..8651d27e284 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/AutodetectResultProcessorIT.java @@ -6,6 +6,12 @@ package org.elasticsearch.xpack.ml.integration; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.AckedClusterStateUpdateTask; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -18,6 +24,7 @@ import org.elasticsearch.xpack.ml.action.util.QueryPage; import org.elasticsearch.xpack.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.ml.job.config.Detector; import org.elasticsearch.xpack.ml.job.config.Job; +import org.elasticsearch.xpack.ml.job.metadata.MlMetadata; import org.elasticsearch.xpack.ml.job.persistence.BucketsQueryBuilder; import org.elasticsearch.xpack.ml.job.persistence.InfluencersQueryBuilder; import org.elasticsearch.xpack.ml.job.persistence.JobProvider; @@ -56,6 +63,9 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -229,6 +239,7 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase { assertResultsAreSame(allRecords, persistedRecords); } + @SuppressWarnings("unchecked") private void createJob() { Detector.Builder detectorBuilder = new Detector.Builder("avg", "metric_field"); detectorBuilder.setByFieldName("by_instance"); @@ -237,7 +248,18 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase { analysisConfBuilder.setInfluencers(Collections.singletonList("influence_field")); jobBuilder.setAnalysisConfig(analysisConfBuilder); - jobProvider.createJobResultIndex(jobBuilder.build(), new ActionListener() { + ClusterState cs = ClusterState.builder(new ClusterName("_name")) + .metaData(MetaData.builder().putCustom(MlMetadata.TYPE, MlMetadata.EMPTY_METADATA).indices(ImmutableOpenMap.of())).build(); + + ClusterService clusterService = mock(ClusterService.class); + + doAnswer(invocationOnMock -> { + AckedClusterStateUpdateTask task = (AckedClusterStateUpdateTask) invocationOnMock.getArguments()[1]; + task.execute(cs); + return null; + }).when(clusterService).submitStateUpdateTask(eq("put-job-" + JOB_ID), any(AckedClusterStateUpdateTask.class)); + + jobProvider.createJobResultIndex(jobBuilder.build(), cs, new ActionListener() { @Override public void onResponse(Boolean aBoolean) { } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlJobIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlJobIT.java index efa57c8c4f1..ab4153486e1 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlJobIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlJobIT.java @@ -213,50 +213,79 @@ public class MlJobIT extends ESRestTestCase { " },\n" + " \"index_name\" : \"%s\"}"; - String jobId = "aliased-job"; + String jobConfig = String.format(Locale.ROOT, jobTemplate, "index-1"); + + Response response = client().performRequest("put", MachineLearning.BASE_PATH + + "anomaly_detectors/repeated-id" , Collections.emptyMap(), new StringEntity(jobConfig)); + assertEquals(200, response.getStatusLine().getStatusCode()); + + final String jobConfig2 = String.format(Locale.ROOT, jobTemplate, "index-2"); + ResponseException e = expectThrows(ResponseException.class, + () ->client().performRequest("put", MachineLearning.BASE_PATH + + "anomaly_detectors/repeated-id" , Collections.emptyMap(), new StringEntity(jobConfig2))); + + assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(400)); + assertThat(e.getMessage(), containsString("The job cannot be created with the Id 'repeated-id'. The Id is already used.")); + } + + public void testCreateJobsWithIndexNameOption() throws Exception { + String jobTemplate = "{\n" + + " \"analysis_config\" : {\n" + + " \"detectors\" :[{\"function\":\"metric\",\"field_name\":\"responsetime\"}]\n" + + " },\n" + + " \"index_name\" : \"%s\"}"; + + String jobId1 = "aliased-job-1"; String indexName = "non-default-index"; String jobConfig = String.format(Locale.ROOT, jobTemplate, indexName); - Response response = client().performRequest("put", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId, Collections.emptyMap(), - new StringEntity(jobConfig)); + Response response = client().performRequest("put", MachineLearning.BASE_PATH + + "anomaly_detectors/" + jobId1, Collections.emptyMap(), new StringEntity(jobConfig)); + assertEquals(200, response.getStatusLine().getStatusCode()); + + String jobId2 = "aliased-job-2"; + response = client().performRequest("put", MachineLearning.BASE_PATH + + "anomaly_detectors/" + jobId2, Collections.emptyMap(), new StringEntity(jobConfig)); assertEquals(200, response.getStatusLine().getStatusCode()); response = client().performRequest("get", "_aliases"); assertEquals(200, response.getStatusLine().getStatusCode()); String responseAsString = responseEntityToString(response); + assertThat(responseAsString, containsString("\"" + AnomalyDetectorsIndex.jobResultsIndexName(indexName) - + "\":{\"aliases\":{\"" + AnomalyDetectorsIndex.jobResultsIndexName(jobId) + "\"")); + + "\":{\"aliases\":{\"" + AnomalyDetectorsIndex.jobResultsIndexName(jobId1) + "\":{},\"" + + AnomalyDetectorsIndex.jobResultsIndexName(jobId2))); response = client().performRequest("get", "_cat/indices"); assertEquals(200, response.getStatusLine().getStatusCode()); responseAsString = responseEntityToString(response); assertThat(responseAsString, containsString(indexName)); + assertThat(responseAsString, not(containsString(AnomalyDetectorsIndex.jobResultsIndexName(jobId1)))); + assertThat(responseAsString, not(containsString(AnomalyDetectorsIndex.jobResultsIndexName(jobId2)))); addBucketResult(indexName, "1234", 1); addBucketResult(indexName, "1236", 1); - response = client().performRequest("get", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/results/buckets"); + response = client().performRequest("get", MachineLearning.BASE_PATH + + "anomaly_detectors/" + jobId1 + "/results/buckets"); assertThat(response.getStatusLine().getStatusCode(), equalTo(200)); responseAsString = responseEntityToString(response); assertThat(responseAsString, containsString("\"count\":2")); - response = client().performRequest("get", AnomalyDetectorsIndex.jobResultsIndexName(indexName) + "/result/_search"); + response = client().performRequest("get", AnomalyDetectorsIndex.jobResultsIndexName(indexName) + + "/result/_search"); assertThat(response.getStatusLine().getStatusCode(), equalTo(200)); responseAsString = responseEntityToString(response); assertThat(responseAsString, containsString("\"total\":2")); - // test that we can't create another job with the same index_name - String jobConfigSameIndexName = String.format(Locale.ROOT, jobTemplate, "new-job-id", indexName); - expectThrows(ResponseException.class, () -> client().performRequest("put", - MachineLearning.BASE_PATH + "anomaly_detectors", Collections.emptyMap(), new StringEntity(jobConfigSameIndexName))); - - response = client().performRequest("delete", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId); + response = client().performRequest("delete", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId1); assertThat(response.getStatusLine().getStatusCode(), equalTo(200)); // check index and alias were deleted response = client().performRequest("get", "_aliases"); assertEquals(200, response.getStatusLine().getStatusCode()); responseAsString = responseEntityToString(response); - assertThat(responseAsString, not(containsString(AnomalyDetectorsIndex.jobResultsIndexName(jobId)))); + assertThat(responseAsString, not(containsString(AnomalyDetectorsIndex.jobResultsIndexName(jobId1)))); + assertThat(responseAsString, not(containsString(AnomalyDetectorsIndex.jobResultsIndexName(jobId2)))); response = client().performRequest("get", "_cat/indices"); assertEquals(200, response.getStatusLine().getStatusCode()); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java index 35ab8dced2d..77957f1dfe6 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/JobManagerTests.java @@ -5,26 +5,17 @@ */ package org.elasticsearch.xpack.ml.job; -import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.ResourceNotFoundException; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.AliasMetaData; -import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.env.Environment; -import org.elasticsearch.index.Index; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.xpack.ml.action.PutJobAction; import org.elasticsearch.xpack.ml.action.util.QueryPage; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.metadata.MlMetadata; -import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; import org.elasticsearch.xpack.ml.notifications.Auditor; @@ -37,10 +28,7 @@ import java.util.stream.Collectors; import static org.elasticsearch.xpack.ml.job.config.JobTests.buildJobBuilder; import static org.hamcrest.Matchers.equalTo; -import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -132,48 +120,6 @@ public class JobManagerTests extends ESTestCase { assertThat(result.results().get(9).getId(), equalTo("9")); } - @SuppressWarnings("unchecked") - public void testPutJobFailsIfIndexExists() { - JobManager jobManager = createJobManager(); - Job.Builder jobBuilder = buildJobBuilder("foo"); - jobBuilder.setIndexName("my-special-place"); - PutJobAction.Request request = new PutJobAction.Request(jobBuilder.build()); - - Index index = mock(Index.class); - when(index.getName()).thenReturn(AnomalyDetectorsIndex.jobResultsIndexName("my-special-place")); - IndexMetaData indexMetaData = mock(IndexMetaData.class); - when(indexMetaData.getIndex()).thenReturn(index); - ImmutableOpenMap aliases = ImmutableOpenMap.of(); - when(indexMetaData.getAliases()).thenReturn(aliases); - - ImmutableOpenMap indexMap = ImmutableOpenMap.builder() - .fPut(AnomalyDetectorsIndex.jobResultsIndexName("my-special-place"), indexMetaData).build(); - - ClusterState cs = ClusterState.builder(new ClusterName("_name")) - .metaData(MetaData.builder().putCustom(MlMetadata.TYPE, MlMetadata.EMPTY_METADATA).indices(indexMap)).build(); - - doAnswer(invocationOnMock -> { - AckedClusterStateUpdateTask task = (AckedClusterStateUpdateTask) invocationOnMock.getArguments()[1]; - task.execute(cs); - return null; - }).when(clusterService).submitStateUpdateTask(eq("put-job-foo"), any(AckedClusterStateUpdateTask.class)); - - ResourceAlreadyExistsException e = expectThrows(ResourceAlreadyExistsException.class, () -> jobManager.putJob(request, - new ActionListener() { - @Override - public void onResponse(PutJobAction.Response response) { - } - - @Override - public void onFailure(Exception e) { - fail(e.toString()); - } - })); - - assertEquals("Cannot create index '.ml-anomalies-my-special-place' as it already exists", e.getMessage()); - } - - private JobManager createJobManager() { Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build(); JobResultsPersister jobResultsPersister = mock(JobResultsPersister.class); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobProviderTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobProviderTests.java index eb7cd61756d..d26d6c710b1 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobProviderTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/persistence/JobProviderTests.java @@ -15,10 +15,19 @@ import org.elasticsearch.action.search.MultiSearchResponse; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.AckedClusterStateUpdateTask; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.AliasMetaData; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.text.Text; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.index.Index; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHitField; @@ -28,6 +37,7 @@ import org.elasticsearch.xpack.ml.action.DeleteJobAction; import org.elasticsearch.xpack.ml.action.util.QueryPage; import org.elasticsearch.xpack.ml.job.config.AnalysisLimits; import org.elasticsearch.xpack.ml.job.config.Job; +import org.elasticsearch.xpack.ml.job.metadata.MlMetadata; import org.elasticsearch.xpack.ml.job.persistence.InfluencersQueryBuilder.InfluencersQuery; import org.elasticsearch.xpack.ml.job.process.autodetect.state.CategorizerState; import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; @@ -63,6 +73,7 @@ import static org.elasticsearch.xpack.ml.job.config.JobTests.buildJobBuilder; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -133,6 +144,7 @@ public class JobProviderTests extends ESTestCase { assertEquals("all_field_values", settings.get("index.query.default_field")); } + @SuppressWarnings("unchecked") public void testCreateJobResultsIndex() { MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME); ArgumentCaptor captor = ArgumentCaptor.forClass(CreateIndexRequest.class); @@ -141,7 +153,18 @@ public class JobProviderTests extends ESTestCase { Job.Builder job = buildJobBuilder("foo"); JobProvider provider = createProvider(clientBuilder.build()); - provider.createJobResultIndex(job.build(), new ActionListener() { + ClusterState cs = ClusterState.builder(new ClusterName("_name")) + .metaData(MetaData.builder().putCustom(MlMetadata.TYPE, MlMetadata.EMPTY_METADATA).indices(ImmutableOpenMap.of())).build(); + + ClusterService clusterService = mock(ClusterService.class); + + doAnswer(invocationOnMock -> { + AckedClusterStateUpdateTask task = (AckedClusterStateUpdateTask) invocationOnMock.getArguments()[1]; + task.execute(cs); + return null; + }).when(clusterService).submitStateUpdateTask(eq("put-job-foo"), any(AckedClusterStateUpdateTask.class)); + + provider.createJobResultIndex(job.build(), cs, new ActionListener() { @Override public void onResponse(Boolean aBoolean) { CreateIndexRequest request = captor.getValue(); @@ -163,6 +186,58 @@ public class JobProviderTests extends ESTestCase { }); } + @SuppressWarnings("unchecked") + public void testCreateJobWithExistingIndex() { + MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME); + ArgumentCaptor captor = ArgumentCaptor.forClass(CreateIndexRequest.class); + clientBuilder.prepareAlias(AnomalyDetectorsIndex.jobResultsIndexName("foo"), AnomalyDetectorsIndex.jobResultsIndexName("foo123")); + + Job.Builder job = buildJobBuilder("foo123"); + job.setIndexName("foo"); + JobProvider provider = createProvider(clientBuilder.build()); + + Index index = mock(Index.class); + when(index.getName()).thenReturn(AnomalyDetectorsIndex.jobResultsIndexName("foo")); + IndexMetaData indexMetaData = mock(IndexMetaData.class); + when(indexMetaData.getIndex()).thenReturn(index); + + ImmutableOpenMap aliases = ImmutableOpenMap.of(); + when(indexMetaData.getAliases()).thenReturn(aliases); + + ImmutableOpenMap indexMap = ImmutableOpenMap.builder() + .fPut(AnomalyDetectorsIndex.jobResultsIndexName("foo"), indexMetaData).build(); + + ClusterState cs2 = ClusterState.builder(new ClusterName("_name")) + .metaData(MetaData.builder().putCustom(MlMetadata.TYPE, MlMetadata.EMPTY_METADATA).indices(indexMap)).build(); + + ClusterService clusterService = mock(ClusterService.class); + + doAnswer(invocationOnMock -> { + AckedClusterStateUpdateTask task = (AckedClusterStateUpdateTask) invocationOnMock.getArguments()[1]; + task.execute(cs2); + return null; + }).when(clusterService).submitStateUpdateTask(eq("put-job-foo123"), any(AckedClusterStateUpdateTask.class)); + + doAnswer(invocationOnMock -> { + AckedClusterStateUpdateTask task = (AckedClusterStateUpdateTask) invocationOnMock.getArguments()[1]; + task.execute(cs2); + return null; + }).when(clusterService).submitStateUpdateTask(eq("index-aliases"), any(AckedClusterStateUpdateTask.class)); + + provider.createJobResultIndex(job.build(), cs2, new ActionListener() { + @Override + public void onResponse(Boolean aBoolean) { + assertTrue(aBoolean); + } + + @Override + public void onFailure(Exception e) { + fail(e.toString()); + } + }); + } + + @SuppressWarnings("unchecked") public void testCreateJobRelatedIndicies_createsAliasIfIndexNameIsSet() { MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME); ArgumentCaptor captor = ArgumentCaptor.forClass(CreateIndexRequest.class); @@ -174,7 +249,28 @@ public class JobProviderTests extends ESTestCase { Client client = clientBuilder.build(); JobProvider provider = createProvider(client); - provider.createJobResultIndex(job.build(), new ActionListener() { + Index index = mock(Index.class); + when(index.getName()).thenReturn(AnomalyDetectorsIndex.jobResultsIndexName("foo")); + IndexMetaData indexMetaData = mock(IndexMetaData.class); + when(indexMetaData.getIndex()).thenReturn(index); + ImmutableOpenMap aliases = ImmutableOpenMap.of(); + when(indexMetaData.getAliases()).thenReturn(aliases); + + ImmutableOpenMap indexMap = ImmutableOpenMap.builder() + .fPut(AnomalyDetectorsIndex.jobResultsIndexName("foo"), indexMetaData).build(); + + ClusterState cs = ClusterState.builder(new ClusterName("_name")) + .metaData(MetaData.builder().putCustom(MlMetadata.TYPE, MlMetadata.EMPTY_METADATA).indices(indexMap)).build(); + + ClusterService clusterService = mock(ClusterService.class); + + doAnswer(invocationOnMock -> { + AckedClusterStateUpdateTask task = (AckedClusterStateUpdateTask) invocationOnMock.getArguments()[1]; + task.execute(cs); + return null; + }).when(clusterService).submitStateUpdateTask(eq("put-job-foo"), any(AckedClusterStateUpdateTask.class)); + + provider.createJobResultIndex(job.build(), cs, new ActionListener() { @Override public void onResponse(Boolean aBoolean) { verify(client.admin().indices(), times(1)).prepareAliases(); @@ -187,6 +283,7 @@ public class JobProviderTests extends ESTestCase { }); } + @SuppressWarnings("unchecked") public void testCreateJobRelatedIndicies_doesntCreateAliasIfIndexNameIsSameAsJobId() { MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME); ArgumentCaptor captor = ArgumentCaptor.forClass(CreateIndexRequest.class); @@ -197,7 +294,28 @@ public class JobProviderTests extends ESTestCase { Client client = clientBuilder.build(); JobProvider provider = createProvider(client); - provider.createJobResultIndex(job.build(), new ActionListener() { + Index index = mock(Index.class); + when(index.getName()).thenReturn(AnomalyDetectorsIndex.jobResultsIndexName("foo")); + IndexMetaData indexMetaData = mock(IndexMetaData.class); + when(indexMetaData.getIndex()).thenReturn(index); + ImmutableOpenMap aliases = ImmutableOpenMap.of(); + when(indexMetaData.getAliases()).thenReturn(aliases); + + ImmutableOpenMap indexMap = ImmutableOpenMap.builder() + .fPut(AnomalyDetectorsIndex.jobResultsIndexName("foo"), indexMetaData).build(); + + ClusterState cs = ClusterState.builder(new ClusterName("_name")) + .metaData(MetaData.builder().putCustom(MlMetadata.TYPE, MlMetadata.EMPTY_METADATA).indices(indexMap)).build(); + + ClusterService clusterService = mock(ClusterService.class); + + doAnswer(invocationOnMock -> { + AckedClusterStateUpdateTask task = (AckedClusterStateUpdateTask) invocationOnMock.getArguments()[1]; + task.execute(cs); + return null; + }).when(clusterService).submitStateUpdateTask(eq("put-job-foo"), any(AckedClusterStateUpdateTask.class)); + + provider.createJobResultIndex(job.build(), cs, new ActionListener() { @Override public void onResponse(Boolean aBoolean) { verify(client.admin().indices(), never()).prepareAliases(); @@ -289,6 +407,7 @@ public class JobProviderTests extends ESTestCase { }); } + @SuppressWarnings("unchecked") public void testCreateJob() throws InterruptedException, ExecutionException { Job.Builder job = buildJobBuilder("marscapone"); job.setDescription("This is a very cheesy job"); @@ -302,7 +421,29 @@ public class JobProviderTests extends ESTestCase { Client client = clientBuilder.build(); JobProvider provider = createProvider(client); AtomicReference resultHolder = new AtomicReference<>(); - provider.createJobResultIndex(job.build(), new ActionListener() { + + Index index = mock(Index.class); + when(index.getName()).thenReturn(AnomalyDetectorsIndex.jobResultsIndexName("marscapone")); + IndexMetaData indexMetaData = mock(IndexMetaData.class); + when(indexMetaData.getIndex()).thenReturn(index); + ImmutableOpenMap aliases = ImmutableOpenMap.of(); + when(indexMetaData.getAliases()).thenReturn(aliases); + + ImmutableOpenMap indexMap = ImmutableOpenMap.builder() + .fPut(AnomalyDetectorsIndex.jobResultsIndexName("marscapone"), indexMetaData).build(); + + ClusterState cs = ClusterState.builder(new ClusterName("_name")) + .metaData(MetaData.builder().putCustom(MlMetadata.TYPE, MlMetadata.EMPTY_METADATA).indices(indexMap)).build(); + + ClusterService clusterService = mock(ClusterService.class); + + doAnswer(invocationOnMock -> { + AckedClusterStateUpdateTask task = (AckedClusterStateUpdateTask) invocationOnMock.getArguments()[1]; + task.execute(cs); + return null; + }).when(clusterService).submitStateUpdateTask(eq("put-job-foo"), any(AckedClusterStateUpdateTask.class)); + + provider.createJobResultIndex(job.build(), cs, new ActionListener() { @Override public void onResponse(Boolean aBoolean) { resultHolder.set(aBoolean);