[ML] Allow result indices to be shared. (elastic/x-pack-elasticsearch#555)

Essentially an update to https://github.com/elastic/prelert-legacy/pull/736 .  Still does not
default to using shared indices, but adds the capability for two jobs to share the same one
without conflict

Still does not default to using shared indices, just adds the capability for two jobs to share the same one without conflict.

Original commit: elastic/x-pack-elasticsearch@60d93a06ea
This commit is contained in:
Zachary Tong 2017-02-14 14:59:28 -05:00 committed by GitHub
parent 72ef1acdc0
commit cb29cbd8a9
7 changed files with 248 additions and 93 deletions

View File

@ -202,7 +202,7 @@ public class PutJobAction extends Action<PutJobAction.Request, PutJobAction.Resp
@Override @Override
protected void masterOperation(Request request, ClusterState state, ActionListener<Response> listener) throws Exception { protected void masterOperation(Request request, ClusterState state, ActionListener<Response> listener) throws Exception {
jobManager.putJob(request, listener); jobManager.putJob(request, state, listener);
} }
@Override @Override

View File

@ -162,11 +162,11 @@ public class JobManager extends AbstractComponent {
/** /**
* Stores a job in the cluster state * Stores a job in the cluster state
*/ */
public void putJob(PutJobAction.Request request, ActionListener<PutJobAction.Response> actionListener) { public void putJob(PutJobAction.Request request, ClusterState state, ActionListener<PutJobAction.Response> actionListener) {
Job job = request.getJob(); Job job = request.getJob();
ActionListener<Boolean> createResultsIndexListener = ActionListener.wrap(jobSaved -> ActionListener<Boolean> createResultsIndexListener = ActionListener.wrap(jobSaved ->
jobProvider.createJobResultIndex(job, new ActionListener<Boolean>() { jobProvider.createJobResultIndex(job, state, new ActionListener<Boolean>() {
@Override @Override
public void onResponse(Boolean indicesCreated) { public void onResponse(Boolean indicesCreated) {
audit(job.getId()).info(Messages.getMessage(Messages.JOB_AUDIT_CREATED)); audit(job.getId()).info(Messages.getMessage(Messages.JOB_AUDIT_CREATED));
@ -193,12 +193,7 @@ public class JobManager extends AbstractComponent {
@Override @Override
public ClusterState execute(ClusterState currentState) throws Exception { public ClusterState execute(ClusterState currentState) throws Exception {
ClusterState cs = updateClusterState(job, false, currentState); return updateClusterState(job, false, currentState);
if (currentState.metaData().index(AnomalyDetectorsIndex.jobResultsIndexName(job.getIndexName())) != null) {
throw new ResourceAlreadyExistsException(Messages.getMessage(Messages.JOB_INDEX_ALREADY_EXISTS,
AnomalyDetectorsIndex.jobResultsIndexName(job.getIndexName())));
}
return cs;
} }
}); });
} }

View File

@ -9,6 +9,7 @@ import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefIterator; import org.apache.lucene.util.BytesRefIterator;
import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
@ -22,6 +23,7 @@ import org.elasticsearch.action.search.MultiSearchResponse;
import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
@ -235,7 +237,7 @@ public class JobProvider {
/** /**
* Create the Elasticsearch index and the mappings * Create the Elasticsearch index and the mappings
*/ */
public void createJobResultIndex(Job job, ActionListener<Boolean> listener) { public void createJobResultIndex(Job job, ClusterState state, ActionListener<Boolean> listener) {
Collection<String> termFields = (job.getAnalysisConfig() != null) ? job.getAnalysisConfig().termFields() : Collections.emptyList(); Collection<String> termFields = (job.getAnalysisConfig() != null) ? job.getAnalysisConfig().termFields() : Collections.emptyList();
try { try {
XContentBuilder resultsMapping = ElasticsearchMappings.resultsMapping(termFields); XContentBuilder resultsMapping = ElasticsearchMappings.resultsMapping(termFields);
@ -247,14 +249,6 @@ public class JobProvider {
boolean createIndexAlias = !job.getIndexName().equals(job.getId()); boolean createIndexAlias = !job.getIndexName().equals(job.getId());
String indexName = AnomalyDetectorsIndex.jobResultsIndexName(job.getIndexName()); String indexName = AnomalyDetectorsIndex.jobResultsIndexName(job.getIndexName());
LOGGER.trace("ES API CALL: create index {}", indexName);
CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);
createIndexRequest.settings(mlResultsIndexSettings());
createIndexRequest.mapping(Result.TYPE.getPreferredName(), resultsMapping);
createIndexRequest.mapping(CategoryDefinition.TYPE.getPreferredName(), categoryDefinitionMapping);
createIndexRequest.mapping(DataCounts.TYPE.getPreferredName(), dataCountsMapping);
createIndexRequest.mapping(ModelSnapshot.TYPE.getPreferredName(), modelSnapshotMapping);
if (createIndexAlias) { if (createIndexAlias) {
final ActionListener<Boolean> responseListener = listener; final ActionListener<Boolean> responseListener = listener;
listener = ActionListener.wrap(aBoolean -> { listener = ActionListener.wrap(aBoolean -> {
@ -265,9 +259,37 @@ public class JobProvider {
listener::onFailure); listener::onFailure);
} }
final ActionListener<Boolean> createdListener = listener; // Indices can be shared, so only create if it doesn't exist already. Saves us a roundtrip if
client.admin().indices().create(createIndexRequest, // already in the CS
ActionListener.wrap(r -> createdListener.onResponse(true), createdListener::onFailure)); if (!state.getMetaData().hasIndex(indexName)) {
LOGGER.trace("ES API CALL: create index {}", indexName);
CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);
createIndexRequest.settings(mlResultsIndexSettings());
createIndexRequest.mapping(Result.TYPE.getPreferredName(), resultsMapping);
createIndexRequest.mapping(CategoryDefinition.TYPE.getPreferredName(), categoryDefinitionMapping);
createIndexRequest.mapping(DataCounts.TYPE.getPreferredName(), dataCountsMapping);
createIndexRequest.mapping(ModelSnapshot.TYPE.getPreferredName(), modelSnapshotMapping);
final ActionListener<Boolean> createdListener = listener;
client.admin().indices().create(createIndexRequest,
ActionListener.wrap(r -> createdListener.onResponse(true),
e -> {
// Possible that the index was created while the request was executing,
// so we need to handle that possibility
if (e instanceof ResourceAlreadyExistsException) {
LOGGER.info("Index already exists");
// Create the alias
createdListener.onResponse(true);
} else {
createdListener.onFailure(e);
}
}
));
} else {
// Trigger the alias creation handler manually, since the index already exists
listener.onResponse(true);
}
} catch (Exception e) { } catch (Exception e) {
listener.onFailure(e); listener.onFailure(e);
} }

View File

@ -6,6 +6,12 @@
package org.elasticsearch.xpack.ml.integration; package org.elasticsearch.xpack.ml.integration;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
@ -18,6 +24,7 @@ import org.elasticsearch.xpack.ml.action.util.QueryPage;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.config.Detector; import org.elasticsearch.xpack.ml.job.config.Detector;
import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.metadata.MlMetadata;
import org.elasticsearch.xpack.ml.job.persistence.BucketsQueryBuilder; import org.elasticsearch.xpack.ml.job.persistence.BucketsQueryBuilder;
import org.elasticsearch.xpack.ml.job.persistence.InfluencersQueryBuilder; import org.elasticsearch.xpack.ml.job.persistence.InfluencersQueryBuilder;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
@ -56,6 +63,9 @@ import java.util.Set;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ -229,6 +239,7 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
assertResultsAreSame(allRecords, persistedRecords); assertResultsAreSame(allRecords, persistedRecords);
} }
@SuppressWarnings("unchecked")
private void createJob() { private void createJob() {
Detector.Builder detectorBuilder = new Detector.Builder("avg", "metric_field"); Detector.Builder detectorBuilder = new Detector.Builder("avg", "metric_field");
detectorBuilder.setByFieldName("by_instance"); detectorBuilder.setByFieldName("by_instance");
@ -237,7 +248,18 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
analysisConfBuilder.setInfluencers(Collections.singletonList("influence_field")); analysisConfBuilder.setInfluencers(Collections.singletonList("influence_field"));
jobBuilder.setAnalysisConfig(analysisConfBuilder); jobBuilder.setAnalysisConfig(analysisConfBuilder);
jobProvider.createJobResultIndex(jobBuilder.build(), new ActionListener<Boolean>() { ClusterState cs = ClusterState.builder(new ClusterName("_name"))
.metaData(MetaData.builder().putCustom(MlMetadata.TYPE, MlMetadata.EMPTY_METADATA).indices(ImmutableOpenMap.of())).build();
ClusterService clusterService = mock(ClusterService.class);
doAnswer(invocationOnMock -> {
AckedClusterStateUpdateTask<Boolean> task = (AckedClusterStateUpdateTask<Boolean>) invocationOnMock.getArguments()[1];
task.execute(cs);
return null;
}).when(clusterService).submitStateUpdateTask(eq("put-job-" + JOB_ID), any(AckedClusterStateUpdateTask.class));
jobProvider.createJobResultIndex(jobBuilder.build(), cs, new ActionListener<Boolean>() {
@Override @Override
public void onResponse(Boolean aBoolean) { public void onResponse(Boolean aBoolean) {
} }

View File

@ -213,50 +213,79 @@ public class MlJobIT extends ESRestTestCase {
" },\n" + " },\n" +
" \"index_name\" : \"%s\"}"; " \"index_name\" : \"%s\"}";
String jobId = "aliased-job"; String jobConfig = String.format(Locale.ROOT, jobTemplate, "index-1");
Response response = client().performRequest("put", MachineLearning.BASE_PATH
+ "anomaly_detectors/repeated-id" , Collections.emptyMap(), new StringEntity(jobConfig));
assertEquals(200, response.getStatusLine().getStatusCode());
final String jobConfig2 = String.format(Locale.ROOT, jobTemplate, "index-2");
ResponseException e = expectThrows(ResponseException.class,
() ->client().performRequest("put", MachineLearning.BASE_PATH
+ "anomaly_detectors/repeated-id" , Collections.emptyMap(), new StringEntity(jobConfig2)));
assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(400));
assertThat(e.getMessage(), containsString("The job cannot be created with the Id 'repeated-id'. The Id is already used."));
}
public void testCreateJobsWithIndexNameOption() throws Exception {
String jobTemplate = "{\n" +
" \"analysis_config\" : {\n" +
" \"detectors\" :[{\"function\":\"metric\",\"field_name\":\"responsetime\"}]\n" +
" },\n" +
" \"index_name\" : \"%s\"}";
String jobId1 = "aliased-job-1";
String indexName = "non-default-index"; String indexName = "non-default-index";
String jobConfig = String.format(Locale.ROOT, jobTemplate, indexName); String jobConfig = String.format(Locale.ROOT, jobTemplate, indexName);
Response response = client().performRequest("put", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId, Collections.emptyMap(), Response response = client().performRequest("put", MachineLearning.BASE_PATH
new StringEntity(jobConfig)); + "anomaly_detectors/" + jobId1, Collections.emptyMap(), new StringEntity(jobConfig));
assertEquals(200, response.getStatusLine().getStatusCode());
String jobId2 = "aliased-job-2";
response = client().performRequest("put", MachineLearning.BASE_PATH
+ "anomaly_detectors/" + jobId2, Collections.emptyMap(), new StringEntity(jobConfig));
assertEquals(200, response.getStatusLine().getStatusCode()); assertEquals(200, response.getStatusLine().getStatusCode());
response = client().performRequest("get", "_aliases"); response = client().performRequest("get", "_aliases");
assertEquals(200, response.getStatusLine().getStatusCode()); assertEquals(200, response.getStatusLine().getStatusCode());
String responseAsString = responseEntityToString(response); String responseAsString = responseEntityToString(response);
assertThat(responseAsString, containsString("\"" + AnomalyDetectorsIndex.jobResultsIndexName(indexName) assertThat(responseAsString, containsString("\"" + AnomalyDetectorsIndex.jobResultsIndexName(indexName)
+ "\":{\"aliases\":{\"" + AnomalyDetectorsIndex.jobResultsIndexName(jobId) + "\"")); + "\":{\"aliases\":{\"" + AnomalyDetectorsIndex.jobResultsIndexName(jobId1) + "\":{},\"" +
AnomalyDetectorsIndex.jobResultsIndexName(jobId2)));
response = client().performRequest("get", "_cat/indices"); response = client().performRequest("get", "_cat/indices");
assertEquals(200, response.getStatusLine().getStatusCode()); assertEquals(200, response.getStatusLine().getStatusCode());
responseAsString = responseEntityToString(response); responseAsString = responseEntityToString(response);
assertThat(responseAsString, containsString(indexName)); assertThat(responseAsString, containsString(indexName));
assertThat(responseAsString, not(containsString(AnomalyDetectorsIndex.jobResultsIndexName(jobId1))));
assertThat(responseAsString, not(containsString(AnomalyDetectorsIndex.jobResultsIndexName(jobId2))));
addBucketResult(indexName, "1234", 1); addBucketResult(indexName, "1234", 1);
addBucketResult(indexName, "1236", 1); addBucketResult(indexName, "1236", 1);
response = client().performRequest("get", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/results/buckets"); response = client().performRequest("get", MachineLearning.BASE_PATH
+ "anomaly_detectors/" + jobId1 + "/results/buckets");
assertThat(response.getStatusLine().getStatusCode(), equalTo(200)); assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
responseAsString = responseEntityToString(response); responseAsString = responseEntityToString(response);
assertThat(responseAsString, containsString("\"count\":2")); assertThat(responseAsString, containsString("\"count\":2"));
response = client().performRequest("get", AnomalyDetectorsIndex.jobResultsIndexName(indexName) + "/result/_search"); response = client().performRequest("get", AnomalyDetectorsIndex.jobResultsIndexName(indexName)
+ "/result/_search");
assertThat(response.getStatusLine().getStatusCode(), equalTo(200)); assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
responseAsString = responseEntityToString(response); responseAsString = responseEntityToString(response);
assertThat(responseAsString, containsString("\"total\":2")); assertThat(responseAsString, containsString("\"total\":2"));
// test that we can't create another job with the same index_name response = client().performRequest("delete", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId1);
String jobConfigSameIndexName = String.format(Locale.ROOT, jobTemplate, "new-job-id", indexName);
expectThrows(ResponseException.class, () -> client().performRequest("put",
MachineLearning.BASE_PATH + "anomaly_detectors", Collections.emptyMap(), new StringEntity(jobConfigSameIndexName)));
response = client().performRequest("delete", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId);
assertThat(response.getStatusLine().getStatusCode(), equalTo(200)); assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
// check index and alias were deleted // check index and alias were deleted
response = client().performRequest("get", "_aliases"); response = client().performRequest("get", "_aliases");
assertEquals(200, response.getStatusLine().getStatusCode()); assertEquals(200, response.getStatusLine().getStatusCode());
responseAsString = responseEntityToString(response); responseAsString = responseEntityToString(response);
assertThat(responseAsString, not(containsString(AnomalyDetectorsIndex.jobResultsIndexName(jobId)))); assertThat(responseAsString, not(containsString(AnomalyDetectorsIndex.jobResultsIndexName(jobId1))));
assertThat(responseAsString, not(containsString(AnomalyDetectorsIndex.jobResultsIndexName(jobId2))));
response = client().performRequest("get", "_cat/indices"); response = client().performRequest("get", "_cat/indices");
assertEquals(200, response.getStatusLine().getStatusCode()); assertEquals(200, response.getStatusLine().getStatusCode());

View File

@ -5,26 +5,17 @@
*/ */
package org.elasticsearch.xpack.ml.job; package org.elasticsearch.xpack.ml.job;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment; import org.elasticsearch.env.Environment;
import org.elasticsearch.index.Index;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.action.PutJobAction;
import org.elasticsearch.xpack.ml.action.util.QueryPage; import org.elasticsearch.xpack.ml.action.util.QueryPage;
import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.metadata.MlMetadata; import org.elasticsearch.xpack.ml.job.metadata.MlMetadata;
import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.notifications.Auditor; import org.elasticsearch.xpack.ml.notifications.Auditor;
@ -37,10 +28,7 @@ import java.util.stream.Collectors;
import static org.elasticsearch.xpack.ml.job.config.JobTests.buildJobBuilder; import static org.elasticsearch.xpack.ml.job.config.JobTests.buildJobBuilder;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ -132,48 +120,6 @@ public class JobManagerTests extends ESTestCase {
assertThat(result.results().get(9).getId(), equalTo("9")); assertThat(result.results().get(9).getId(), equalTo("9"));
} }
@SuppressWarnings("unchecked")
public void testPutJobFailsIfIndexExists() {
JobManager jobManager = createJobManager();
Job.Builder jobBuilder = buildJobBuilder("foo");
jobBuilder.setIndexName("my-special-place");
PutJobAction.Request request = new PutJobAction.Request(jobBuilder.build());
Index index = mock(Index.class);
when(index.getName()).thenReturn(AnomalyDetectorsIndex.jobResultsIndexName("my-special-place"));
IndexMetaData indexMetaData = mock(IndexMetaData.class);
when(indexMetaData.getIndex()).thenReturn(index);
ImmutableOpenMap<String, AliasMetaData> aliases = ImmutableOpenMap.of();
when(indexMetaData.getAliases()).thenReturn(aliases);
ImmutableOpenMap<String, IndexMetaData> indexMap = ImmutableOpenMap.<String, IndexMetaData>builder()
.fPut(AnomalyDetectorsIndex.jobResultsIndexName("my-special-place"), indexMetaData).build();
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
.metaData(MetaData.builder().putCustom(MlMetadata.TYPE, MlMetadata.EMPTY_METADATA).indices(indexMap)).build();
doAnswer(invocationOnMock -> {
AckedClusterStateUpdateTask<Boolean> task = (AckedClusterStateUpdateTask<Boolean>) invocationOnMock.getArguments()[1];
task.execute(cs);
return null;
}).when(clusterService).submitStateUpdateTask(eq("put-job-foo"), any(AckedClusterStateUpdateTask.class));
ResourceAlreadyExistsException e = expectThrows(ResourceAlreadyExistsException.class, () -> jobManager.putJob(request,
new ActionListener<PutJobAction.Response>() {
@Override
public void onResponse(PutJobAction.Response response) {
}
@Override
public void onFailure(Exception e) {
fail(e.toString());
}
}));
assertEquals("Cannot create index '.ml-anomalies-my-special-place' as it already exists", e.getMessage());
}
private JobManager createJobManager() { private JobManager createJobManager() {
Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build(); Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build();
JobResultsPersister jobResultsPersister = mock(JobResultsPersister.class); JobResultsPersister jobResultsPersister = mock(JobResultsPersister.class);

View File

@ -15,10 +15,19 @@ import org.elasticsearch.action.search.MultiSearchResponse;
import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.text.Text; import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHitField; import org.elasticsearch.search.SearchHitField;
@ -28,6 +37,7 @@ import org.elasticsearch.xpack.ml.action.DeleteJobAction;
import org.elasticsearch.xpack.ml.action.util.QueryPage; import org.elasticsearch.xpack.ml.action.util.QueryPage;
import org.elasticsearch.xpack.ml.job.config.AnalysisLimits; import org.elasticsearch.xpack.ml.job.config.AnalysisLimits;
import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.metadata.MlMetadata;
import org.elasticsearch.xpack.ml.job.persistence.InfluencersQueryBuilder.InfluencersQuery; import org.elasticsearch.xpack.ml.job.persistence.InfluencersQueryBuilder.InfluencersQuery;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.CategorizerState; import org.elasticsearch.xpack.ml.job.process.autodetect.state.CategorizerState;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
@ -63,6 +73,7 @@ import static org.elasticsearch.xpack.ml.job.config.JobTests.buildJobBuilder;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.instanceOf;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never; import static org.mockito.Mockito.never;
@ -133,6 +144,7 @@ public class JobProviderTests extends ESTestCase {
assertEquals("all_field_values", settings.get("index.query.default_field")); assertEquals("all_field_values", settings.get("index.query.default_field"));
} }
@SuppressWarnings("unchecked")
public void testCreateJobResultsIndex() { public void testCreateJobResultsIndex() {
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME); MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME);
ArgumentCaptor<CreateIndexRequest> captor = ArgumentCaptor.forClass(CreateIndexRequest.class); ArgumentCaptor<CreateIndexRequest> captor = ArgumentCaptor.forClass(CreateIndexRequest.class);
@ -141,7 +153,18 @@ public class JobProviderTests extends ESTestCase {
Job.Builder job = buildJobBuilder("foo"); Job.Builder job = buildJobBuilder("foo");
JobProvider provider = createProvider(clientBuilder.build()); JobProvider provider = createProvider(clientBuilder.build());
provider.createJobResultIndex(job.build(), new ActionListener<Boolean>() { ClusterState cs = ClusterState.builder(new ClusterName("_name"))
.metaData(MetaData.builder().putCustom(MlMetadata.TYPE, MlMetadata.EMPTY_METADATA).indices(ImmutableOpenMap.of())).build();
ClusterService clusterService = mock(ClusterService.class);
doAnswer(invocationOnMock -> {
AckedClusterStateUpdateTask<Boolean> task = (AckedClusterStateUpdateTask<Boolean>) invocationOnMock.getArguments()[1];
task.execute(cs);
return null;
}).when(clusterService).submitStateUpdateTask(eq("put-job-foo"), any(AckedClusterStateUpdateTask.class));
provider.createJobResultIndex(job.build(), cs, new ActionListener<Boolean>() {
@Override @Override
public void onResponse(Boolean aBoolean) { public void onResponse(Boolean aBoolean) {
CreateIndexRequest request = captor.getValue(); CreateIndexRequest request = captor.getValue();
@ -163,6 +186,58 @@ public class JobProviderTests extends ESTestCase {
}); });
} }
@SuppressWarnings("unchecked")
public void testCreateJobWithExistingIndex() {
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME);
ArgumentCaptor<CreateIndexRequest> captor = ArgumentCaptor.forClass(CreateIndexRequest.class);
clientBuilder.prepareAlias(AnomalyDetectorsIndex.jobResultsIndexName("foo"), AnomalyDetectorsIndex.jobResultsIndexName("foo123"));
Job.Builder job = buildJobBuilder("foo123");
job.setIndexName("foo");
JobProvider provider = createProvider(clientBuilder.build());
Index index = mock(Index.class);
when(index.getName()).thenReturn(AnomalyDetectorsIndex.jobResultsIndexName("foo"));
IndexMetaData indexMetaData = mock(IndexMetaData.class);
when(indexMetaData.getIndex()).thenReturn(index);
ImmutableOpenMap<String, AliasMetaData> aliases = ImmutableOpenMap.of();
when(indexMetaData.getAliases()).thenReturn(aliases);
ImmutableOpenMap<String, IndexMetaData> indexMap = ImmutableOpenMap.<String, IndexMetaData>builder()
.fPut(AnomalyDetectorsIndex.jobResultsIndexName("foo"), indexMetaData).build();
ClusterState cs2 = ClusterState.builder(new ClusterName("_name"))
.metaData(MetaData.builder().putCustom(MlMetadata.TYPE, MlMetadata.EMPTY_METADATA).indices(indexMap)).build();
ClusterService clusterService = mock(ClusterService.class);
doAnswer(invocationOnMock -> {
AckedClusterStateUpdateTask<Boolean> task = (AckedClusterStateUpdateTask<Boolean>) invocationOnMock.getArguments()[1];
task.execute(cs2);
return null;
}).when(clusterService).submitStateUpdateTask(eq("put-job-foo123"), any(AckedClusterStateUpdateTask.class));
doAnswer(invocationOnMock -> {
AckedClusterStateUpdateTask<Boolean> task = (AckedClusterStateUpdateTask<Boolean>) invocationOnMock.getArguments()[1];
task.execute(cs2);
return null;
}).when(clusterService).submitStateUpdateTask(eq("index-aliases"), any(AckedClusterStateUpdateTask.class));
provider.createJobResultIndex(job.build(), cs2, new ActionListener<Boolean>() {
@Override
public void onResponse(Boolean aBoolean) {
assertTrue(aBoolean);
}
@Override
public void onFailure(Exception e) {
fail(e.toString());
}
});
}
@SuppressWarnings("unchecked")
public void testCreateJobRelatedIndicies_createsAliasIfIndexNameIsSet() { public void testCreateJobRelatedIndicies_createsAliasIfIndexNameIsSet() {
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME); MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME);
ArgumentCaptor<CreateIndexRequest> captor = ArgumentCaptor.forClass(CreateIndexRequest.class); ArgumentCaptor<CreateIndexRequest> captor = ArgumentCaptor.forClass(CreateIndexRequest.class);
@ -174,7 +249,28 @@ public class JobProviderTests extends ESTestCase {
Client client = clientBuilder.build(); Client client = clientBuilder.build();
JobProvider provider = createProvider(client); JobProvider provider = createProvider(client);
provider.createJobResultIndex(job.build(), new ActionListener<Boolean>() { Index index = mock(Index.class);
when(index.getName()).thenReturn(AnomalyDetectorsIndex.jobResultsIndexName("foo"));
IndexMetaData indexMetaData = mock(IndexMetaData.class);
when(indexMetaData.getIndex()).thenReturn(index);
ImmutableOpenMap<String, AliasMetaData> aliases = ImmutableOpenMap.of();
when(indexMetaData.getAliases()).thenReturn(aliases);
ImmutableOpenMap<String, IndexMetaData> indexMap = ImmutableOpenMap.<String, IndexMetaData>builder()
.fPut(AnomalyDetectorsIndex.jobResultsIndexName("foo"), indexMetaData).build();
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
.metaData(MetaData.builder().putCustom(MlMetadata.TYPE, MlMetadata.EMPTY_METADATA).indices(indexMap)).build();
ClusterService clusterService = mock(ClusterService.class);
doAnswer(invocationOnMock -> {
AckedClusterStateUpdateTask<Boolean> task = (AckedClusterStateUpdateTask<Boolean>) invocationOnMock.getArguments()[1];
task.execute(cs);
return null;
}).when(clusterService).submitStateUpdateTask(eq("put-job-foo"), any(AckedClusterStateUpdateTask.class));
provider.createJobResultIndex(job.build(), cs, new ActionListener<Boolean>() {
@Override @Override
public void onResponse(Boolean aBoolean) { public void onResponse(Boolean aBoolean) {
verify(client.admin().indices(), times(1)).prepareAliases(); verify(client.admin().indices(), times(1)).prepareAliases();
@ -187,6 +283,7 @@ public class JobProviderTests extends ESTestCase {
}); });
} }
@SuppressWarnings("unchecked")
public void testCreateJobRelatedIndicies_doesntCreateAliasIfIndexNameIsSameAsJobId() { public void testCreateJobRelatedIndicies_doesntCreateAliasIfIndexNameIsSameAsJobId() {
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME); MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME);
ArgumentCaptor<CreateIndexRequest> captor = ArgumentCaptor.forClass(CreateIndexRequest.class); ArgumentCaptor<CreateIndexRequest> captor = ArgumentCaptor.forClass(CreateIndexRequest.class);
@ -197,7 +294,28 @@ public class JobProviderTests extends ESTestCase {
Client client = clientBuilder.build(); Client client = clientBuilder.build();
JobProvider provider = createProvider(client); JobProvider provider = createProvider(client);
provider.createJobResultIndex(job.build(), new ActionListener<Boolean>() { Index index = mock(Index.class);
when(index.getName()).thenReturn(AnomalyDetectorsIndex.jobResultsIndexName("foo"));
IndexMetaData indexMetaData = mock(IndexMetaData.class);
when(indexMetaData.getIndex()).thenReturn(index);
ImmutableOpenMap<String, AliasMetaData> aliases = ImmutableOpenMap.of();
when(indexMetaData.getAliases()).thenReturn(aliases);
ImmutableOpenMap<String, IndexMetaData> indexMap = ImmutableOpenMap.<String, IndexMetaData>builder()
.fPut(AnomalyDetectorsIndex.jobResultsIndexName("foo"), indexMetaData).build();
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
.metaData(MetaData.builder().putCustom(MlMetadata.TYPE, MlMetadata.EMPTY_METADATA).indices(indexMap)).build();
ClusterService clusterService = mock(ClusterService.class);
doAnswer(invocationOnMock -> {
AckedClusterStateUpdateTask<Boolean> task = (AckedClusterStateUpdateTask<Boolean>) invocationOnMock.getArguments()[1];
task.execute(cs);
return null;
}).when(clusterService).submitStateUpdateTask(eq("put-job-foo"), any(AckedClusterStateUpdateTask.class));
provider.createJobResultIndex(job.build(), cs, new ActionListener<Boolean>() {
@Override @Override
public void onResponse(Boolean aBoolean) { public void onResponse(Boolean aBoolean) {
verify(client.admin().indices(), never()).prepareAliases(); verify(client.admin().indices(), never()).prepareAliases();
@ -289,6 +407,7 @@ public class JobProviderTests extends ESTestCase {
}); });
} }
@SuppressWarnings("unchecked")
public void testCreateJob() throws InterruptedException, ExecutionException { public void testCreateJob() throws InterruptedException, ExecutionException {
Job.Builder job = buildJobBuilder("marscapone"); Job.Builder job = buildJobBuilder("marscapone");
job.setDescription("This is a very cheesy job"); job.setDescription("This is a very cheesy job");
@ -302,7 +421,29 @@ public class JobProviderTests extends ESTestCase {
Client client = clientBuilder.build(); Client client = clientBuilder.build();
JobProvider provider = createProvider(client); JobProvider provider = createProvider(client);
AtomicReference<Boolean> resultHolder = new AtomicReference<>(); AtomicReference<Boolean> resultHolder = new AtomicReference<>();
provider.createJobResultIndex(job.build(), new ActionListener<Boolean>() {
Index index = mock(Index.class);
when(index.getName()).thenReturn(AnomalyDetectorsIndex.jobResultsIndexName("marscapone"));
IndexMetaData indexMetaData = mock(IndexMetaData.class);
when(indexMetaData.getIndex()).thenReturn(index);
ImmutableOpenMap<String, AliasMetaData> aliases = ImmutableOpenMap.of();
when(indexMetaData.getAliases()).thenReturn(aliases);
ImmutableOpenMap<String, IndexMetaData> indexMap = ImmutableOpenMap.<String, IndexMetaData>builder()
.fPut(AnomalyDetectorsIndex.jobResultsIndexName("marscapone"), indexMetaData).build();
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
.metaData(MetaData.builder().putCustom(MlMetadata.TYPE, MlMetadata.EMPTY_METADATA).indices(indexMap)).build();
ClusterService clusterService = mock(ClusterService.class);
doAnswer(invocationOnMock -> {
AckedClusterStateUpdateTask<Boolean> task = (AckedClusterStateUpdateTask<Boolean>) invocationOnMock.getArguments()[1];
task.execute(cs);
return null;
}).when(clusterService).submitStateUpdateTask(eq("put-job-foo"), any(AckedClusterStateUpdateTask.class));
provider.createJobResultIndex(job.build(), cs, new ActionListener<Boolean>() {
@Override @Override
public void onResponse(Boolean aBoolean) { public void onResponse(Boolean aBoolean) {
resultHolder.set(aBoolean); resultHolder.set(aBoolean);