[ML] Update index mapping for jobs in shared indices (elastic/x-pack-elasticsearch#572)

* Update input field mappings if job results index exists

* Add field names to the index mapping in shared indices

* Throw error if creating job will violate the index.mapping.total_fields.limit

* Review comments

Original commit: elastic/x-pack-elasticsearch@d7712d9263
This commit is contained in:
David Kyle 2017-02-16 17:34:47 +00:00 committed by GitHub
parent 6ec6db8544
commit 15987f49c0
8 changed files with 237 additions and 59 deletions

View File

@ -242,15 +242,9 @@ public class MachineLearning extends Plugin implements ActionPlugin {
if (false == enabled || this.transportClientMode) { if (false == enabled || this.transportClientMode) {
return emptyList(); return emptyList();
} }
// Whether we are using native process is a good way to detect whether we are in dev / test mode:
TimeValue delayedNodeTimeOutSetting;
if (USE_NATIVE_PROCESS_OPTION.get(settings)) {
delayedNodeTimeOutSetting = UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.get(settings);
} else {
delayedNodeTimeOutSetting = TimeValue.timeValueNanos(0);
}
JobResultsPersister jobResultsPersister = new JobResultsPersister(settings, client); JobResultsPersister jobResultsPersister = new JobResultsPersister(settings, client);
JobProvider jobProvider = new JobProvider(client, 1, delayedNodeTimeOutSetting); JobProvider jobProvider = new JobProvider(client, 1, settings);
JobDataCountsPersister jobDataCountsPersister = new JobDataCountsPersister(settings, client); JobDataCountsPersister jobDataCountsPersister = new JobDataCountsPersister(settings, client);
JobManager jobManager = new JobManager(settings, jobProvider, jobResultsPersister, clusterService); JobManager jobManager = new JobManager(settings, jobProvider, jobResultsPersister, clusterService);

View File

@ -255,11 +255,7 @@ public class ElasticsearchMappings {
addInfluencerFieldsToMapping(builder); addInfluencerFieldsToMapping(builder);
addModelSizeStatsFieldsToMapping(builder); addModelSizeStatsFieldsToMapping(builder);
for (String fieldName : termFieldNames) { addTermFields(builder, termFieldNames);
if (ReservedFieldNames.isValidFieldName(fieldName)) {
builder.startObject(fieldName).field(TYPE, KEYWORD).endObject();
}
}
// End result properties // End result properties
builder.endObject(); builder.endObject();
@ -271,6 +267,20 @@ public class ElasticsearchMappings {
return builder; return builder;
} }
public static XContentBuilder termFieldsMapping(Collection<String> termFields) throws IOException {
XContentBuilder builder = jsonBuilder().startObject().startObject(PROPERTIES);
addTermFields(builder, termFields);
return builder.endObject().endObject();
}
private static void addTermFields(XContentBuilder builder, Collection<String> termFields) throws IOException {
for (String fieldName : termFields) {
if (ReservedFieldNames.isValidFieldName(fieldName)) {
builder.startObject(fieldName).field(TYPE, KEYWORD).endObject();
}
}
}
/** /**
* AnomalyRecord fields to be added under the 'properties' section of the mapping * AnomalyRecord fields to be added under the 'properties' section of the mapping
* @param builder Add properties to this builder * @param builder Add properties to this builder

View File

@ -14,6 +14,8 @@ import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.get.MultiGetItemResponse; import org.elasticsearch.action.get.MultiGetItemResponse;
@ -25,9 +27,11 @@ import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
@ -51,6 +55,7 @@ import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortBuilder; import org.elasticsearch.search.sort.SortBuilder;
import org.elasticsearch.search.sort.SortBuilders; import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.action.DeleteJobAction; import org.elasticsearch.xpack.ml.action.DeleteJobAction;
import org.elasticsearch.xpack.ml.action.util.QueryPage; import org.elasticsearch.xpack.ml.action.util.QueryPage;
import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.Job;
@ -85,6 +90,7 @@ import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
@ -124,10 +130,19 @@ public class JobProvider {
// for at least a minute for shards to get allocated. // for at least a minute for shards to get allocated.
private final TimeValue delayedNodeTimeOutSetting; private final TimeValue delayedNodeTimeOutSetting;
public JobProvider(Client client, int numberOfReplicas, TimeValue delayedNodeTimeOutSetting) { private final Settings settings;
public JobProvider(Client client, int numberOfReplicas, Settings settings) {
this.client = Objects.requireNonNull(client); this.client = Objects.requireNonNull(client);
this.numberOfReplicas = numberOfReplicas; this.numberOfReplicas = numberOfReplicas;
this.delayedNodeTimeOutSetting = delayedNodeTimeOutSetting; this.settings = settings;
// Whether we are using native process is a good way to detect whether we are in dev / test mode:
if (MachineLearning.USE_NATIVE_PROCESS_OPTION.get(settings)) {
delayedNodeTimeOutSetting = UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.get(settings);
} else {
delayedNodeTimeOutSetting = TimeValue.timeValueNanos(0);
}
} }
/** /**
@ -286,8 +301,8 @@ public class JobProvider {
} }
)); ));
} else { } else {
// Trigger the alias creation handler manually, since the index already exists // Add the job's term fields to the index mapping
listener.onResponse(true); updateIndexMappingWithTermFields(indexName, termFields, listener);
} }
} catch (Exception e) { } catch (Exception e) {
@ -295,6 +310,71 @@ public class JobProvider {
} }
} }
private void updateIndexMappingWithTermFields(String indexName, Collection<String> termFields, ActionListener<Boolean> listener) {
checkNumberOfFieldsLimit(indexName, termFields.size(), new ActionListener<Boolean>() {
@Override
public void onResponse(Boolean aBoolean) {
try {
client.admin().indices().preparePutMapping(indexName).setType(Result.TYPE.getPreferredName())
.setSource(ElasticsearchMappings.termFieldsMapping(termFields))
.execute(new ActionListener<PutMappingResponse>() {
@Override
public void onResponse(PutMappingResponse putMappingResponse) {
listener.onResponse(putMappingResponse.isAcknowledged());
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
} catch (Exception e) {
listener.onFailure(e);
}
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
}
private void checkNumberOfFieldsLimit(String indexName, long additionalFieldCount, ActionListener<Boolean> listener) {
client.admin().indices().prepareGetMappings(indexName).execute(new ActionListener<GetMappingsResponse>() {
@Override
public void onResponse(GetMappingsResponse getMappingsResponse) {
ImmutableOpenMap<String, MappingMetaData> typeMappings = getMappingsResponse.mappings().get(indexName);
Iterator<com.carrotsearch.hppc.cursors.ObjectObjectCursor<String, MappingMetaData>> iter = typeMappings.iterator();
long numFields = 0;
try {
while (iter.hasNext()) {
Map<String, Object> props = (Map<String, Object>)iter.next().value.getSourceAsMap().get("properties");
numFields += props.size();
}
}
catch (IOException e) {
listener.onFailure(e);
}
long fieldCountLimit = MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.get(settings);
if (numFields + additionalFieldCount > fieldCountLimit) {
String message = "Cannot create job in index '" + indexName + "' as the " +
MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey() + " setting will be violated";
listener.onFailure(new IllegalArgumentException(message));
} else {
listener.onResponse(true);
}
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
}
public void createJobStateIndex(BiConsumer<Boolean, Exception> listener) { public void createJobStateIndex(BiConsumer<Boolean, Exception> listener) {
try { try {
XContentBuilder categorizerStateMapping = ElasticsearchMappings.categorizerStateMapping(); XContentBuilder categorizerStateMapping = ElasticsearchMappings.categorizerStateMapping();
@ -315,7 +395,6 @@ public class JobProvider {
} }
} }
/** /**
* Delete all the job related documents from the database. * Delete all the job related documents from the database.
*/ */

View File

@ -10,6 +10,7 @@ import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.io.stream.Writeable;
@ -91,7 +92,9 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
public void createComponents() { public void createComponents() {
renormalizer = new NoOpRenormalizer(); renormalizer = new NoOpRenormalizer();
jobResultsPersister = new JobResultsPersister(nodeSettings(), client()); jobResultsPersister = new JobResultsPersister(nodeSettings(), client());
jobProvider = new JobProvider(client(), 1, TimeValue.timeValueSeconds(1)); Settings.Builder builder = Settings.builder()
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), TimeValue.timeValueSeconds(1));
jobProvider = new JobProvider(client(), 1, builder.build());
} }
public void testProcessResults() throws Exception { public void testProcessResults() throws Exception {

View File

@ -206,7 +206,7 @@ public class MlJobIT extends ESRestTestCase {
assertThat(responseAsString, containsString("\"count\":1")); assertThat(responseAsString, containsString("\"count\":1"));
} }
public void testCreateJobWithIndexNameOption() throws Exception { public void testCantCreateJobWithSameID() throws Exception {
String jobTemplate = "{\n" + String jobTemplate = "{\n" +
" \"analysis_config\" : {\n" + " \"analysis_config\" : {\n" +
" \"detectors\" :[{\"function\":\"metric\",\"field_name\":\"responsetime\"}]\n" + " \"detectors\" :[{\"function\":\"metric\",\"field_name\":\"responsetime\"}]\n" +
@ -293,6 +293,43 @@ public class MlJobIT extends ESRestTestCase {
assertThat(responseAsString, not(containsString(indexName))); assertThat(responseAsString, not(containsString(indexName)));
} }
public void testCreateJobInSharedIndexUpdatesMapping() throws Exception {
String jobTemplate = "{\n" +
" \"analysis_config\" : {\n" +
" \"detectors\" :[{\"function\":\"metric\",\"field_name\":\"metric\", \"by_field_name\":\"%s\"}]\n" +
" },\n" +
" \"index_name\" : \"shared-index\"}";
String jobId1 = "job-1";
String byFieldName1 = "responsetime";
String jobId2 = "job-2";
String byFieldName2 = "cpu-usage";
String jobConfig = String.format(Locale.ROOT, jobTemplate, byFieldName1);
Response response = client().performRequest("put", MachineLearning.BASE_PATH
+ "anomaly_detectors/" + jobId1, Collections.emptyMap(), new StringEntity(jobConfig));
assertEquals(200, response.getStatusLine().getStatusCode());
// Check the index mapping contains the first by_field_name
response = client().performRequest("get", AnomalyDetectorsIndex.jobResultsIndexName("shared-index") + "/_mapping?pretty");
assertEquals(200, response.getStatusLine().getStatusCode());
String responseAsString = responseEntityToString(response);
assertThat(responseAsString, containsString(byFieldName1));
assertThat(responseAsString, not(containsString(byFieldName2)));
jobConfig = String.format(Locale.ROOT, jobTemplate, byFieldName2);
response = client().performRequest("put", MachineLearning.BASE_PATH
+ "anomaly_detectors/" + jobId2, Collections.emptyMap(), new StringEntity(jobConfig));
assertEquals(200, response.getStatusLine().getStatusCode());
// Check the index mapping now contains both fields
response = client().performRequest("get", AnomalyDetectorsIndex.jobResultsIndexName("shared-index") + "/_mapping?pretty");
assertEquals(200, response.getStatusLine().getStatusCode());
responseAsString = responseEntityToString(response);
assertThat(responseAsString, containsString(byFieldName1));
assertThat(responseAsString, containsString(byFieldName2));
}
public void testDeleteJob() throws Exception { public void testDeleteJob() throws Exception {
String jobId = "foo"; String jobId = "foo";
String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId); String indexName = AnomalyDetectorsIndex.jobResultsIndexName(jobId);

View File

@ -127,7 +127,7 @@ public class ElasticsearchMappingsTests extends ESTestCase {
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void testResultMapping() throws IOException { public void testResultMapping_WithExtraTermFields() throws IOException {
XContentBuilder builder = ElasticsearchMappings.resultsMapping( XContentBuilder builder = ElasticsearchMappings.resultsMapping(
Arrays.asList("instance", AnomalyRecord.ANOMALY_SCORE.getPreferredName())); Arrays.asList("instance", AnomalyRecord.ANOMALY_SCORE.getPreferredName()));
@ -148,4 +148,27 @@ public class ElasticsearchMappingsTests extends ESTestCase {
assertEquals(ElasticsearchMappings.DOUBLE, dataType); assertEquals(ElasticsearchMappings.DOUBLE, dataType);
} }
@SuppressWarnings("unchecked")
public void testTermFieldMapping() throws IOException {
XContentBuilder builder = ElasticsearchMappings.termFieldsMapping(Arrays.asList("apple", "strawberry",
AnomalyRecord.BUCKET_SPAN.getPreferredName()));
XContentParser parser = createParser(builder);
Map<String, Object> properties = (Map<String, Object>) parser.map().get(ElasticsearchMappings.PROPERTIES);
Map<String, Object> instanceMapping = (Map<String, Object>) properties.get("apple");
assertNotNull(instanceMapping);
String dataType = (String)instanceMapping.get(ElasticsearchMappings.TYPE);
assertEquals(ElasticsearchMappings.KEYWORD, dataType);
instanceMapping = (Map<String, Object>) properties.get("strawberry");
assertNotNull(instanceMapping);
dataType = (String)instanceMapping.get(ElasticsearchMappings.TYPE);
assertEquals(ElasticsearchMappings.KEYWORD, dataType);
// check no mapping for the reserved field
instanceMapping = (Map<String, Object>) properties.get(AnomalyRecord.BUCKET_SPAN.getPreferredName());
assertNull(instanceMapping);
}
} }

View File

@ -9,6 +9,8 @@ import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.MultiSearchRequest; import org.elasticsearch.action.search.MultiSearchRequest;
import org.elasticsearch.action.search.MultiSearchResponse; import org.elasticsearch.action.search.MultiSearchResponse;
@ -20,7 +22,9 @@ import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.AliasMetaData; import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
@ -28,6 +32,8 @@ import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHitField; import org.elasticsearch.search.SearchHitField;
@ -189,8 +195,17 @@ public class JobProviderTests extends ESTestCase {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void testCreateJobWithExistingIndex() { public void testCreateJobWithExistingIndex() {
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME); MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME);
ArgumentCaptor<CreateIndexRequest> captor = ArgumentCaptor.forClass(CreateIndexRequest.class);
clientBuilder.prepareAlias(AnomalyDetectorsIndex.jobResultsIndexName("foo"), AnomalyDetectorsIndex.jobResultsIndexName("foo123")); clientBuilder.prepareAlias(AnomalyDetectorsIndex.jobResultsIndexName("foo"), AnomalyDetectorsIndex.jobResultsIndexName("foo123"));
clientBuilder.preparePutMapping(mock(PutMappingResponse.class), Result.TYPE.getPreferredName());
GetMappingsResponse getMappingsResponse = mock(GetMappingsResponse.class);
ImmutableOpenMap<String, MappingMetaData> typeMappings = ImmutableOpenMap.<String, MappingMetaData>of();
ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> mappings =
ImmutableOpenMap.<String, ImmutableOpenMap<String, MappingMetaData>>builder()
.fPut(AnomalyDetectorsIndex.jobResultsIndexName("foo"), typeMappings).build();
when(getMappingsResponse.mappings()).thenReturn(mappings);
clientBuilder.prepareGetMapping(getMappingsResponse);
Job.Builder job = buildJobBuilder("foo123"); Job.Builder job = buildJobBuilder("foo123");
job.setResultsIndexName("foo"); job.setResultsIndexName("foo");
@ -207,27 +222,28 @@ public class JobProviderTests extends ESTestCase {
ImmutableOpenMap<String, IndexMetaData> indexMap = ImmutableOpenMap.<String, IndexMetaData>builder() ImmutableOpenMap<String, IndexMetaData> indexMap = ImmutableOpenMap.<String, IndexMetaData>builder()
.fPut(AnomalyDetectorsIndex.jobResultsIndexName("foo"), indexMetaData).build(); .fPut(AnomalyDetectorsIndex.jobResultsIndexName("foo"), indexMetaData).build();
ClusterState cs2 = ClusterState.builder(new ClusterName("_name")) ClusterState cs = ClusterState.builder(new ClusterName("_name"))
.metaData(MetaData.builder().putCustom(MlMetadata.TYPE, MlMetadata.EMPTY_METADATA).indices(indexMap)).build(); .metaData(MetaData.builder().putCustom(MlMetadata.TYPE, MlMetadata.EMPTY_METADATA).indices(indexMap)).build();
ClusterService clusterService = mock(ClusterService.class); ClusterService clusterService = mock(ClusterService.class);
doAnswer(invocationOnMock -> { doAnswer(invocationOnMock -> {
AckedClusterStateUpdateTask<Boolean> task = (AckedClusterStateUpdateTask<Boolean>) invocationOnMock.getArguments()[1]; AckedClusterStateUpdateTask<Boolean> task = (AckedClusterStateUpdateTask<Boolean>) invocationOnMock.getArguments()[1];
task.execute(cs2); task.execute(cs);
return null; return null;
}).when(clusterService).submitStateUpdateTask(eq("put-job-foo123"), any(AckedClusterStateUpdateTask.class)); }).when(clusterService).submitStateUpdateTask(eq("put-job-foo123"), any(AckedClusterStateUpdateTask.class));
doAnswer(invocationOnMock -> { doAnswer(invocationOnMock -> {
AckedClusterStateUpdateTask<Boolean> task = (AckedClusterStateUpdateTask<Boolean>) invocationOnMock.getArguments()[1]; AckedClusterStateUpdateTask<Boolean> task = (AckedClusterStateUpdateTask<Boolean>) invocationOnMock.getArguments()[1];
task.execute(cs2); task.execute(cs);
return null; return null;
}).when(clusterService).submitStateUpdateTask(eq("index-aliases"), any(AckedClusterStateUpdateTask.class)); }).when(clusterService).submitStateUpdateTask(eq("index-aliases"), any(AckedClusterStateUpdateTask.class));
provider.createJobResultIndex(job.build(), cs2, new ActionListener<Boolean>() { provider.createJobResultIndex(job.build(), cs, new ActionListener<Boolean>() {
@Override @Override
public void onResponse(Boolean aBoolean) { public void onResponse(Boolean aBoolean) {
assertTrue(aBoolean); assertTrue(aBoolean);
verify(clientBuilder.build().admin().indices(), times(1)).preparePutMapping(any());
} }
@Override @Override
@ -288,6 +304,15 @@ public class JobProviderTests extends ESTestCase {
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME); MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME);
ArgumentCaptor<CreateIndexRequest> captor = ArgumentCaptor.forClass(CreateIndexRequest.class); ArgumentCaptor<CreateIndexRequest> captor = ArgumentCaptor.forClass(CreateIndexRequest.class);
clientBuilder.createIndexRequest(AnomalyDetectorsIndex.jobResultsIndexName("foo"), captor); clientBuilder.createIndexRequest(AnomalyDetectorsIndex.jobResultsIndexName("foo"), captor);
clientBuilder.preparePutMapping(mock(PutMappingResponse.class), Result.TYPE.getPreferredName());
GetMappingsResponse getMappingsResponse = mock(GetMappingsResponse.class);
ImmutableOpenMap<String, MappingMetaData> typeMappings = ImmutableOpenMap.of();
ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> mappings =
ImmutableOpenMap.<String, ImmutableOpenMap<String, MappingMetaData>>builder()
.fPut(AnomalyDetectorsIndex.jobResultsIndexName("foo"), typeMappings).build();
when(getMappingsResponse.mappings()).thenReturn(mappings);
clientBuilder.prepareGetMapping(getMappingsResponse);
Job.Builder job = buildJobBuilder("foo"); Job.Builder job = buildJobBuilder("foo");
job.setResultsIndexName("foo"); job.setResultsIndexName("foo");
@ -319,6 +344,7 @@ public class JobProviderTests extends ESTestCase {
@Override @Override
public void onResponse(Boolean aBoolean) { public void onResponse(Boolean aBoolean) {
verify(client.admin().indices(), never()).prepareAliases(); verify(client.admin().indices(), never()).prepareAliases();
verify(clientBuilder.build().admin().indices(), times(1)).preparePutMapping(any());
} }
@Override @Override
@ -422,18 +448,8 @@ public class JobProviderTests extends ESTestCase {
JobProvider provider = createProvider(client); JobProvider provider = createProvider(client);
AtomicReference<Boolean> resultHolder = new AtomicReference<>(); AtomicReference<Boolean> resultHolder = new AtomicReference<>();
Index index = mock(Index.class);
when(index.getName()).thenReturn(AnomalyDetectorsIndex.jobResultsIndexName("marscapone"));
IndexMetaData indexMetaData = mock(IndexMetaData.class);
when(indexMetaData.getIndex()).thenReturn(index);
ImmutableOpenMap<String, AliasMetaData> aliases = ImmutableOpenMap.of();
when(indexMetaData.getAliases()).thenReturn(aliases);
ImmutableOpenMap<String, IndexMetaData> indexMap = ImmutableOpenMap.<String, IndexMetaData>builder()
.fPut(AnomalyDetectorsIndex.jobResultsIndexName("marscapone"), indexMetaData).build();
ClusterState cs = ClusterState.builder(new ClusterName("_name")) ClusterState cs = ClusterState.builder(new ClusterName("_name"))
.metaData(MetaData.builder().putCustom(MlMetadata.TYPE, MlMetadata.EMPTY_METADATA).indices(indexMap)).build(); .metaData(MetaData.builder().putCustom(MlMetadata.TYPE, MlMetadata.EMPTY_METADATA)).build();
ClusterService clusterService = mock(ClusterService.class); ClusterService clusterService = mock(ClusterService.class);
@ -1291,7 +1307,10 @@ public class JobProviderTests extends ESTestCase {
} }
private JobProvider createProvider(Client client) { private JobProvider createProvider(Client client) {
return new JobProvider(client, 0, TimeValue.timeValueSeconds(1)); Settings.Builder builder = Settings.builder()
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), TimeValue.timeValueSeconds(1))
.put(MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey(), 1000L);
return new JobProvider(client, 0, builder.build());
} }
private static GetResponse createGetResponse(boolean exists, Map<String, Object> source) throws IOException { private static GetResponse createGetResponse(boolean exists, Map<String, Object> source) throws IOException {

View File

@ -20,6 +20,10 @@ import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequestBuilder;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequestBuilder;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.bulk.BulkResponse;
@ -31,8 +35,6 @@ import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequestBuilder; import org.elasticsearch.action.search.SearchScrollRequestBuilder;
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.AdminClient; import org.elasticsearch.client.AdminClient;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.client.ClusterAdminClient; import org.elasticsearch.client.ClusterAdminClient;
@ -41,7 +43,6 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.script.Script;
import org.elasticsearch.search.sort.SortBuilder; import org.elasticsearch.search.sort.SortBuilder;
import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.xpack.ml.action.DeleteJobAction; import org.elasticsearch.xpack.ml.action.DeleteJobAction;
@ -312,30 +313,42 @@ public class MockClientBuilder {
return this; return this;
} }
public MockClientBuilder prepareUpdateScript(String index, String type, String id, ArgumentCaptor<Script> getSource, public MockClientBuilder preparePutMapping(PutMappingResponse response, String type) {
ArgumentCaptor<Map<String, Object>> getParams) { PutMappingRequestBuilder requestBuilder = mock(PutMappingRequestBuilder.class);
UpdateRequestBuilder builder = mock(UpdateRequestBuilder.class); when(requestBuilder.setType(eq(type))).thenReturn(requestBuilder);
when(client.prepareUpdate(index, type, id)).thenReturn(builder); when(requestBuilder.setSource(any(XContentBuilder.class))).thenReturn(requestBuilder);
when(builder.setScript(getSource.capture())).thenReturn(builder); doAnswer(new Answer<Void>() {
when(builder.setUpsert(getParams.capture())).thenReturn(builder); @Override
when(builder.setRetryOnConflict(any(int.class))).thenReturn(builder); public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
when(builder.get()).thenReturn(mock(UpdateResponse.class)); ActionListener<PutMappingResponse> listener =
(ActionListener<PutMappingResponse>) invocationOnMock.getArguments()[0];
listener.onResponse(response);
return null;
}
}).when(requestBuilder).execute(any());
when(indicesAdminClient.preparePutMapping(any())).thenReturn(requestBuilder);
return this; return this;
} }
public MockClientBuilder prepareUpdateScript(String index, String type, String id, ArgumentCaptor<Script> getSource, public MockClientBuilder prepareGetMapping(GetMappingsResponse response) {
ArgumentCaptor<Map<String, Object>> getParams, Exception e) { GetMappingsRequestBuilder builder = mock(GetMappingsRequestBuilder.class);
UpdateRequestBuilder builder = mock(UpdateRequestBuilder.class);
when(client.prepareUpdate(index, type, id)).thenReturn(builder); doAnswer(new Answer<Void>() {
when(builder.setScript(getSource.capture())).thenReturn(builder); @Override
when(builder.setUpsert(getParams.capture())).thenReturn(builder); public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
when(builder.setRetryOnConflict(any(int.class))).thenReturn(builder); ActionListener<GetMappingsResponse> listener =
doAnswer(invocation -> { (ActionListener<GetMappingsResponse>) invocationOnMock.getArguments()[0];
throw e; listener.onResponse(response);
}).when(builder).get(); return null;
}
}).when(builder).execute(any());
when(indicesAdminClient.prepareGetMappings(any())).thenReturn(builder);
return this; return this;
} }
public Client build() { public Client build() {
return client; return client;
} }