[ML] Simplify JobManager#createJobResultIndex(...)

Original commit: elastic/x-pack-elasticsearch@e8b01e183c
This commit is contained in:
Martijn van Groningen 2017-03-03 13:32:14 +01:00
parent 9e7d6d7fee
commit 49826ae134
7 changed files with 154 additions and 76 deletions

View File

@ -268,7 +268,7 @@ public class MachineLearning extends Plugin implements ActionPlugin {
} }
JobResultsPersister jobResultsPersister = new JobResultsPersister(settings, client); JobResultsPersister jobResultsPersister = new JobResultsPersister(settings, client);
JobProvider jobProvider = new JobProvider(client, 1, settings); JobProvider jobProvider = new JobProvider(client, settings);
JobDataCountsPersister jobDataCountsPersister = new JobDataCountsPersister(settings, client); JobDataCountsPersister jobDataCountsPersister = new JobDataCountsPersister(settings, client);
Auditor auditor = new Auditor(client, clusterService); Auditor auditor = new Auditor(client, clusterService);

View File

@ -264,10 +264,22 @@ public class ElasticsearchMappings {
return builder; return builder;
} }
public static XContentBuilder termFieldsMapping(Collection<String> termFields) throws IOException { static XContentBuilder termFieldsMapping(String type, Collection<String> termFields) {
XContentBuilder builder = jsonBuilder().startObject().startObject(PROPERTIES); try {
addTermFields(builder, termFields); XContentBuilder builder = jsonBuilder().startObject();
return builder.endObject().endObject(); if (type != null) {
builder.startObject(type);
}
builder.startObject(PROPERTIES);
addTermFields(builder, termFields);
builder.endObject();
if (type != null) {
builder.endObject();
}
return builder.endObject();
} catch (IOException e) {
throw new RuntimeException(e);
}
} }
private static void addTermFields(XContentBuilder builder, Collection<String> termFields) throws IOException { private static void addTermFields(XContentBuilder builder, Collection<String> termFields) throws IOException {

View File

@ -14,7 +14,6 @@ import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsResponse;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.get.GetResponse;
@ -27,6 +26,8 @@ import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
@ -108,7 +109,7 @@ public class JobProvider {
private final Client client; private final Client client;
private final Settings settings; private final Settings settings;
public JobProvider(Client client, int numberOfReplicas, Settings settings) { public JobProvider(Client client, Settings settings) {
this.client = Objects.requireNonNull(client); this.client = Objects.requireNonNull(client);
this.settings = settings; this.settings = settings;
} }
@ -116,95 +117,105 @@ public class JobProvider {
/** /**
* Create the Elasticsearch index and the mappings * Create the Elasticsearch index and the mappings
*/ */
public void createJobResultIndex(Job job, ClusterState state, ActionListener<Boolean> listener) { public void createJobResultIndex(Job job, ClusterState state, final ActionListener<Boolean> finalListener) {
Collection<String> termFields = (job.getAnalysisConfig() != null) ? job.getAnalysisConfig().termFields() : Collections.emptyList(); Collection<String> termFields = (job.getAnalysisConfig() != null) ? job.getAnalysisConfig().termFields() : Collections.emptyList();
String aliasName = AnomalyDetectorsIndex.jobResultsAliasedName(job.getId()); String aliasName = AnomalyDetectorsIndex.jobResultsAliasedName(job.getId());
String indexName = job.getResultsIndexName(); String indexName = job.getResultsIndexName();
final ActionListener<Boolean> responseListener = listener; final ActionListener<Boolean> createAliasListener = ActionListener.wrap(success -> {
listener = ActionListener.wrap(aBoolean -> {
client.admin().indices().prepareAliases() client.admin().indices().prepareAliases()
.addAlias(indexName, aliasName, QueryBuilders.termQuery(Job.ID.getPreferredName(), job.getId())) .addAlias(indexName, aliasName, QueryBuilders.termQuery(Job.ID.getPreferredName(), job.getId()))
.execute(ActionListener.wrap(r -> responseListener.onResponse(true), responseListener::onFailure)); // we could return 'sucess && r.isAcknowledged()' instead of 'true', but that makes
// testing not possible as we can't create IndicesAliasesResponse instance or
// mock IndicesAliasesResponse#isAcknowledged()
.execute(ActionListener.wrap(r -> finalListener.onResponse(true),
finalListener::onFailure));
}, },
listener::onFailure); finalListener::onFailure);
// Indices can be shared, so only create if it doesn't exist already. Saves us a roundtrip if // Indices can be shared, so only create if it doesn't exist already. Saves us a roundtrip if
// already in the CS // already in the CS
if (!state.getMetaData().hasIndex(indexName)) { if (!state.getMetaData().hasIndex(indexName)) {
LOGGER.trace("ES API CALL: create index {}", indexName); LOGGER.trace("ES API CALL: create index {}", indexName);
CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName); CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);
String type = Result.TYPE.getPreferredName();
final ActionListener<Boolean> createdListener = listener; createIndexRequest.mapping(type, ElasticsearchMappings.termFieldsMapping(type, termFields));
client.admin().indices().create(createIndexRequest, client.admin().indices().create(createIndexRequest,
ActionListener.wrap( ActionListener.wrap(
r -> updateIndexMappingWithTermFields(indexName, termFields, createdListener), r -> createAliasListener.onResponse(r.isAcknowledged()),
e -> { e -> {
// Possible that the index was created while the request was executing, // Possible that the index was created while the request was executing,
// so we need to handle that possibility // so we need to handle that possibility
if (e instanceof ResourceAlreadyExistsException) { if (e instanceof ResourceAlreadyExistsException) {
LOGGER.info("Index already exists"); LOGGER.info("Index already exists");
// Create the alias // Create the alias
createdListener.onResponse(true); createAliasListener.onResponse(true);
} else { } else {
createdListener.onFailure(e); finalListener.onFailure(e);
} }
} }
)); ));
} else { } else {
// Add the job's term fields to the index mapping long fieldCountLimit = MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.get(settings);
final ActionListener<Boolean> updateMappingListener = listener; if (violatedFieldCountLimit(indexName, termFields.size(), fieldCountLimit, state)) {
checkNumberOfFieldsLimit(indexName, termFields.size(), ActionListener.wrap( String message = "Cannot create job in index '" + indexName + "' as the " +
r -> updateIndexMappingWithTermFields(indexName, termFields, updateMappingListener), MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey() + " setting will be violated";
updateMappingListener::onFailure)); finalListener.onFailure(new IllegalArgumentException(message));
} else {
updateIndexMappingWithTermFields(indexName, termFields,
ActionListener.wrap(createAliasListener::onResponse, finalListener::onFailure));
}
} }
} }
static boolean violatedFieldCountLimit(String indexName, long additionalFieldCount, long fieldCountLimit, ClusterState clusterState) {
long numFields = 0;
IndexMetaData indexMetaData = clusterState.metaData().index(indexName);
Iterator<MappingMetaData> mappings = indexMetaData.getMappings().valuesIt();
while (mappings.hasNext()) {
MappingMetaData mapping = mappings.next();
try {
numFields += countFields(mapping.sourceAsMap());
} catch (IOException e) {
throw new RuntimeException(e);
}
}
if (numFields + additionalFieldCount > fieldCountLimit) {
return true;
} else {
return false;
}
}
@SuppressWarnings("unchecked")
static int countFields(Map<String, Object> mapping) {
Object propertiesNode = mapping.get("properties");
if (propertiesNode != null && propertiesNode instanceof Map) {
mapping = (Map<String, Object>) propertiesNode;
} else {
return 0;
}
int count = 0;
for (Map.Entry<String, Object> entry : mapping.entrySet()) {
if (entry.getValue() instanceof Map) {
Map<String, Object> fieldMapping = (Map<String, Object>) entry.getValue();
// take into account object and nested fields:
count += countFields(fieldMapping);
}
count++;
}
return count;
}
private void updateIndexMappingWithTermFields(String indexName, Collection<String> termFields, ActionListener<Boolean> listener) { private void updateIndexMappingWithTermFields(String indexName, Collection<String> termFields, ActionListener<Boolean> listener) {
try { client.admin().indices().preparePutMapping(indexName).setType(Result.TYPE.getPreferredName())
client.admin().indices().preparePutMapping(indexName).setType(Result.TYPE.getPreferredName()) .setSource(ElasticsearchMappings.termFieldsMapping(null, termFields))
.setSource(ElasticsearchMappings.termFieldsMapping(termFields)) .execute(new ActionListener<PutMappingResponse>() {
.execute(new ActionListener<PutMappingResponse>() {
@Override
public void onResponse(PutMappingResponse putMappingResponse) {
listener.onResponse(true);
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
} catch (IOException e) {
listener.onFailure(e);
}
}
private void checkNumberOfFieldsLimit(String indexName, long additionalFieldCount, ActionListener<Boolean> listener) {
client.admin().indices().prepareGetFieldMappings(indexName).setTypes("*").setFields("*").execute(
new ActionListener<GetFieldMappingsResponse>() {
@Override @Override
public void onResponse(GetFieldMappingsResponse getFieldMappingsResponse) { public void onResponse(PutMappingResponse putMappingResponse) {
long numFields = 0; listener.onResponse(putMappingResponse.isAcknowledged());
Map<String, Map<String, Map<String, GetFieldMappingsResponse.FieldMappingMetaData>>> indexMappings =
getFieldMappingsResponse.mappings();
for (String index : indexMappings.keySet()) {
Map<String, Map<String, GetFieldMappingsResponse.FieldMappingMetaData>> typeMappings = indexMappings.get(index);
for (String type : typeMappings.keySet()) {
Map<String, GetFieldMappingsResponse.FieldMappingMetaData> fieldMappings = typeMappings.get(type);
numFields += fieldMappings.size();
}
}
long fieldCountLimit = MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.get(settings);
if (numFields + additionalFieldCount > fieldCountLimit) {
String message = "Cannot create job in index '" + indexName + "' as the " +
MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey() + " setting will be violated";
listener.onFailure(new IllegalArgumentException(message));
} else {
listener.onResponse(true);
}
} }
@Override @Override

View File

@ -76,7 +76,7 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
jobResultsPersister = new JobResultsPersister(nodeSettings(), client()); jobResultsPersister = new JobResultsPersister(nodeSettings(), client());
Settings.Builder builder = Settings.builder() Settings.Builder builder = Settings.builder()
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), TimeValue.timeValueSeconds(1)); .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), TimeValue.timeValueSeconds(1));
jobProvider = new JobProvider(client(), 1, builder.build()); jobProvider = new JobProvider(client(), builder.build());
capturedUpdateModelSnapshotOnJobRequests = new ArrayList<>(); capturedUpdateModelSnapshotOnJobRequests = new ArrayList<>();
resultProcessor = new AutoDetectResultProcessor(client(), JOB_ID, renormalizer, jobResultsPersister) { resultProcessor = new AutoDetectResultProcessor(client(), JOB_ID, renormalizer, jobResultsPersister) {
@Override @Override

View File

@ -5,6 +5,10 @@
*/ */
package org.elasticsearch.xpack.ml.job.persistence; package org.elasticsearch.xpack.ml.job.persistence;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
@ -17,18 +21,12 @@ import org.elasticsearch.xpack.ml.job.results.CategoryDefinition;
import org.elasticsearch.xpack.ml.job.results.ReservedFieldNames; import org.elasticsearch.xpack.ml.job.results.ReservedFieldNames;
import org.elasticsearch.xpack.ml.job.results.Result; import org.elasticsearch.xpack.ml.job.results.Result;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import java.io.BufferedInputStream; import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.InvocationTargetException; import java.lang.reflect.InvocationTargetException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -129,7 +127,7 @@ public class ElasticsearchMappingsTests extends ESTestCase {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void testTermFieldMapping() throws IOException { public void testTermFieldMapping() throws IOException {
XContentBuilder builder = ElasticsearchMappings.termFieldsMapping(Arrays.asList("apple", "strawberry", XContentBuilder builder = ElasticsearchMappings.termFieldsMapping(null, Arrays.asList("apple", "strawberry",
AnomalyRecord.BUCKET_SPAN.getPreferredName())); AnomalyRecord.BUCKET_SPAN.getPreferredName()));
XContentParser parser = createParser(builder); XContentParser parser = createParser(builder);

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.ml.job.persistence; package org.elasticsearch.xpack.ml.job.persistence;
import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
@ -137,7 +138,6 @@ public class JobProviderTests extends ESTestCase {
ArgumentCaptor<CreateIndexRequest> captor = ArgumentCaptor.forClass(CreateIndexRequest.class); ArgumentCaptor<CreateIndexRequest> captor = ArgumentCaptor.forClass(CreateIndexRequest.class);
clientBuilder.createIndexRequest(resultsIndexName, captor); clientBuilder.createIndexRequest(resultsIndexName, captor);
clientBuilder.prepareAlias(resultsIndexName, AnomalyDetectorsIndex.jobResultsAliasedName("foo"), jobFilter); clientBuilder.prepareAlias(resultsIndexName, AnomalyDetectorsIndex.jobResultsAliasedName("foo"), jobFilter);
clientBuilder.preparePutMapping(mock(PutMappingResponse.class), Result.TYPE.getPreferredName());
Job.Builder job = buildJobBuilder("foo"); Job.Builder job = buildJobBuilder("foo");
JobProvider provider = createProvider(clientBuilder.build()); JobProvider provider = createProvider(clientBuilder.build());
@ -1100,6 +1100,63 @@ public class JobProviderTests extends ESTestCase {
assertEquals("{\"modName\":\"modVal2\"}", restoreData[2]); assertEquals("{\"modName\":\"modVal2\"}", restoreData[2]);
} }
public void testViolatedFieldCountLimit() throws Exception {
Map<String, Object> mapping = new HashMap<>();
for (int i = 0; i < 10; i++) {
mapping.put("field" + i, Collections.singletonMap("type", "string"));
}
IndexMetaData.Builder indexMetaData1 = new IndexMetaData.Builder("index1")
.settings(Settings.builder()
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0))
.putMapping(new MappingMetaData("type1", Collections.singletonMap("properties", mapping)));
MetaData metaData = MetaData.builder()
.put(indexMetaData1)
.build();
boolean result = JobProvider.violatedFieldCountLimit("index1", 0, 10,
ClusterState.builder(new ClusterName("_name")).metaData(metaData).build());
assertFalse(result);
result = JobProvider.violatedFieldCountLimit("index1", 1, 10,
ClusterState.builder(new ClusterName("_name")).metaData(metaData).build());
assertTrue(result);
IndexMetaData.Builder indexMetaData2 = new IndexMetaData.Builder("index1")
.settings(Settings.builder()
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0))
.putMapping(new MappingMetaData("type1", Collections.singletonMap("properties", mapping)))
.putMapping(new MappingMetaData("type2", Collections.singletonMap("properties", mapping)));
metaData = MetaData.builder()
.put(indexMetaData2)
.build();
result = JobProvider.violatedFieldCountLimit("index1", 0, 19,
ClusterState.builder(new ClusterName("_name")).metaData(metaData).build());
assertTrue(result);
}
public void testCountFields() {
Map<String, Object> mapping = new HashMap<>();
mapping.put("field1", Collections.singletonMap("type", "string"));
mapping.put("field2", Collections.singletonMap("type", "string"));
mapping.put("field3", Collections.singletonMap("type", "string"));
assertEquals(3, JobProvider.countFields(Collections.singletonMap("properties", mapping)));
Map<String, Object> objectProperties = new HashMap<>();
objectProperties.put("field4", Collections.singletonMap("type", "string"));
objectProperties.put("field5", Collections.singletonMap("type", "string"));
objectProperties.put("field6", Collections.singletonMap("type", "string"));
Map<String, Object> objectField = new HashMap<>();
objectField.put("type", "object");
objectField.put("properties", objectProperties);
mapping.put("field4", objectField);
assertEquals(7, JobProvider.countFields(Collections.singletonMap("properties", mapping)));
}
private Bucket createBucketAtEpochTime(long epoch) { private Bucket createBucketAtEpochTime(long epoch) {
Bucket b = new Bucket("foo", new Date(epoch), 123); Bucket b = new Bucket("foo", new Date(epoch), 123);
b.setMaxNormalizedProbability(10.0); b.setMaxNormalizedProbability(10.0);
@ -1107,7 +1164,7 @@ public class JobProviderTests extends ESTestCase {
} }
private JobProvider createProvider(Client client) { private JobProvider createProvider(Client client) {
return new JobProvider(client, 0, Settings.EMPTY); return new JobProvider(client, Settings.EMPTY);
} }
private static GetResponse createGetResponse(boolean exists, Map<String, Object> source) throws IOException { private static GetResponse createGetResponse(boolean exists, Map<String, Object> source) throws IOException {

View File

@ -54,7 +54,6 @@ import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
import java.io.IOException; import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertArrayEquals;
@ -179,7 +178,8 @@ public class MockClientBuilder {
public MockClientBuilder createIndexRequest(String index, ArgumentCaptor<CreateIndexRequest> requestCapture) { public MockClientBuilder createIndexRequest(String index, ArgumentCaptor<CreateIndexRequest> requestCapture) {
doAnswer(invocation -> { doAnswer(invocation -> {
((ActionListener) invocation.getArguments()[1]).onResponse(mock(CreateIndexResponse.class)); CreateIndexResponse response = new CreateIndexResponse(true, true) {};
((ActionListener) invocation.getArguments()[1]).onResponse(response);
return null; return null;
}).when(indicesAdminClient).create(requestCapture.capture(), any(ActionListener.class)); }).when(indicesAdminClient).create(requestCapture.capture(), any(ActionListener.class));
return this; return this;