[ML] Add job_id filter to job alias

This fixes returning results only for the job asked
when a shared index is used.

The commit also refactors the field count check to
use the field mappings API and solves a bug where the
check blows due to the _default_ type not having properties.

[Zach] Minor test tweak to MlJobIT

Original commit: elastic/x-pack-elasticsearch@729f886879
This commit is contained in:
Dimitrios Athanasiou 2017-03-02 19:09:32 +00:00 committed by Zachary Tong
parent 34a4778b11
commit c52689a9a0
4 changed files with 78 additions and 81 deletions

View File

@ -14,7 +14,7 @@ import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsResponse;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
@ -27,11 +27,9 @@ import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
@ -107,7 +105,6 @@ public class JobProvider {
private static final int RECORDS_SIZE_PARAM = 500;
private final Client client;
private final Settings settings;
@ -122,49 +119,46 @@ public class JobProvider {
public void createJobResultIndex(Job job, ClusterState state, ActionListener<Boolean> listener) {
Collection<String> termFields = (job.getAnalysisConfig() != null) ? job.getAnalysisConfig().termFields() : Collections.emptyList();
String aliasName = AnomalyDetectorsIndex.jobResultsAliasedName(job.getId());
String indexName = job.getResultsIndexName();
String aliasName = AnomalyDetectorsIndex.jobResultsAliasedName(job.getId());
String indexName = job.getResultsIndexName();
final ActionListener<Boolean> responseListener = listener;
listener = ActionListener.wrap(aBoolean -> {
client.admin().indices().prepareAliases()
.addAlias(indexName, aliasName, QueryBuilders.termQuery(Job.ID.getPreferredName(), job.getId()))
.execute(ActionListener.wrap(r -> responseListener.onResponse(true), responseListener::onFailure));
},
listener::onFailure);
// Indices can be shared, so only create if it doesn't exist already. Saves us a roundtrip if
// already in the CS
if (!state.getMetaData().hasIndex(indexName)) {
LOGGER.trace("ES API CALL: create index {}", indexName);
CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);
final ActionListener<Boolean> responseListener = listener;
listener = ActionListener.wrap(aBoolean -> {
client.admin().indices().prepareAliases()
.addAlias(indexName, aliasName)
.execute(ActionListener.wrap(r -> responseListener.onResponse(true), responseListener::onFailure));
},
listener::onFailure);
// Indices can be shared, so only create if it doesn't exist already. Saves us a roundtrip if
// already in the CS
if (!state.getMetaData().hasIndex(indexName)) {
LOGGER.trace("ES API CALL: create index {}", indexName);
CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);
final ActionListener<Boolean> createdListener = listener;
client.admin().indices().create(createIndexRequest,
ActionListener.wrap(
r -> updateIndexMappingWithTermFields(indexName, termFields, createdListener),
e -> {
// Possible that the index was created while the request was executing,
// so we need to handle that possibility
if (e instanceof ResourceAlreadyExistsException) {
LOGGER.info("Index already exists");
// Create the alias
createdListener.onResponse(true);
} else {
createdListener.onFailure(e);
}
final ActionListener<Boolean> createdListener = listener;
client.admin().indices().create(createIndexRequest,
ActionListener.wrap(
r -> updateIndexMappingWithTermFields(indexName, termFields, createdListener),
e -> {
// Possible that the index was created while the request was executing,
// so we need to handle that possibility
if (e instanceof ResourceAlreadyExistsException) {
LOGGER.info("Index already exists");
// Create the alias
createdListener.onResponse(true);
} else {
createdListener.onFailure(e);
}
));
} else {
// Add the job's term fields to the index mapping
final ActionListener<Boolean> updateMappingListener = listener;
checkNumberOfFieldsLimit(indexName, termFields.size(), ActionListener.wrap(
r -> updateIndexMappingWithTermFields(indexName, termFields, updateMappingListener),
updateMappingListener::onFailure));
}
}
));
} else {
// Add the job's term fields to the index mapping
final ActionListener<Boolean> updateMappingListener = listener;
checkNumberOfFieldsLimit(indexName, termFields.size(), ActionListener.wrap(
r -> updateIndexMappingWithTermFields(indexName, termFields, updateMappingListener),
updateMappingListener::onFailure));
}
}
private void updateIndexMappingWithTermFields(String indexName, Collection<String> termFields, ActionListener<Boolean> listener) {
@ -188,37 +182,36 @@ public class JobProvider {
}
private void checkNumberOfFieldsLimit(String indexName, long additionalFieldCount, ActionListener<Boolean> listener) {
client.admin().indices().prepareGetMappings(indexName).execute(new ActionListener<GetMappingsResponse>() {
@Override
public void onResponse(GetMappingsResponse getMappingsResponse) {
ImmutableOpenMap<String, MappingMetaData> typeMappings = getMappingsResponse.mappings().get(indexName);
Iterator<com.carrotsearch.hppc.cursors.ObjectObjectCursor<String, MappingMetaData>> iter = typeMappings.iterator();
long numFields = 0;
try {
while (iter.hasNext()) {
Map<String, Object> props = (Map<String, Object>)iter.next().value.getSourceAsMap().get("properties");
numFields += props.size();
client.admin().indices().prepareGetFieldMappings(indexName).setTypes("*").setFields("*").execute(
new ActionListener<GetFieldMappingsResponse>() {
@Override
public void onResponse(GetFieldMappingsResponse getFieldMappingsResponse) {
long numFields = 0;
Map<String, Map<String, Map<String, GetFieldMappingsResponse.FieldMappingMetaData>>> indexMappings =
getFieldMappingsResponse.mappings();
for (String index : indexMappings.keySet()) {
Map<String, Map<String, GetFieldMappingsResponse.FieldMappingMetaData>> typeMappings = indexMappings.get(index);
for (String type : typeMappings.keySet()) {
Map<String, GetFieldMappingsResponse.FieldMappingMetaData> fieldMappings = typeMappings.get(type);
numFields += fieldMappings.size();
}
}
long fieldCountLimit = MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.get(settings);
if (numFields + additionalFieldCount > fieldCountLimit) {
String message = "Cannot create job in index '" + indexName + "' as the " +
MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey() + " setting will be violated";
listener.onFailure(new IllegalArgumentException(message));
} else {
listener.onResponse(true);
}
}
}
catch (IOException e) {
listener.onFailure(e);
}
long fieldCountLimit = MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.get(settings);
if (numFields + additionalFieldCount > fieldCountLimit) {
String message = "Cannot create job in index '" + indexName + "' as the " +
MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey() + " setting will be violated";
listener.onFailure(new IllegalArgumentException(message));
} else {
listener.onResponse(true);
}
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
}
/**
@ -792,7 +785,7 @@ public class JobProvider {
* @param size number of snapshots to retrieve
*/
public void modelSnapshots(String jobId, int from, int size, Consumer<QueryPage<ModelSnapshot>> handler,
Consumer<Exception> errorHandler) {
Consumer<Exception> errorHandler) {
modelSnapshots(jobId, from, size, null, true, QueryBuilders.matchAllQuery(), handler, errorHandler);
}

View File

@ -259,7 +259,8 @@ public class MlJobIT extends ESRestTestCase {
String responseAsString = responseEntityToString(response);
assertThat(responseAsString, containsString("\"" + AnomalyDetectorsIndex.jobResultsAliasedName("custom-" + indexName)
+ "\":{\"aliases\":{\"" + AnomalyDetectorsIndex.jobResultsAliasedName(jobId1) + "\":{},\"" +
+ "\":{\"aliases\":{\"" + AnomalyDetectorsIndex.jobResultsAliasedName(jobId1)
+ "\":{\"filter\":{\"term\":{\"job_id\":{\"value\":\"" + jobId1 + "\",\"boost\":1.0}}}},\"" +
AnomalyDetectorsIndex.jobResultsAliasedName(jobId2)));
response = client().performRequest("get", "_cat/indices");

View File

@ -31,14 +31,15 @@ import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHitField;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.action.DeleteJobAction;
import org.elasticsearch.xpack.ml.action.util.QueryPage;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.job.persistence.InfluencersQueryBuilder.InfluencersQuery;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.CategorizerState;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot;
@ -73,7 +74,6 @@ import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ -131,11 +131,12 @@ public class JobProviderTests extends ESTestCase {
@SuppressWarnings("unchecked")
public void testCreateJobResultsIndex() {
String resultsIndexName = AnomalyDetectorsIndex.RESULTS_INDEX_PREFIX + AnomalyDetectorsIndex.RESULTS_INDEX_DEFAULT;
QueryBuilder jobFilter = QueryBuilders.termQuery("job_id", "foo");
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME);
ArgumentCaptor<CreateIndexRequest> captor = ArgumentCaptor.forClass(CreateIndexRequest.class);
clientBuilder.createIndexRequest(resultsIndexName, captor);
clientBuilder.prepareAlias(resultsIndexName, AnomalyDetectorsIndex.jobResultsAliasedName("foo"));
clientBuilder.prepareAlias(resultsIndexName, AnomalyDetectorsIndex.jobResultsAliasedName("foo"), jobFilter);
clientBuilder.preparePutMapping(mock(PutMappingResponse.class), Result.TYPE.getPreferredName());
Job.Builder job = buildJobBuilder("foo");
@ -175,9 +176,10 @@ public class JobProviderTests extends ESTestCase {
@SuppressWarnings("unchecked")
public void testCreateJobWithExistingIndex() {
QueryBuilder jobFilter = QueryBuilders.termQuery("job_id", "foo");
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME);
clientBuilder.prepareAlias(AnomalyDetectorsIndex.jobResultsAliasedName("foo"),
AnomalyDetectorsIndex.jobResultsAliasedName("foo123"));
AnomalyDetectorsIndex.jobResultsAliasedName("foo123"), jobFilter);
clientBuilder.preparePutMapping(mock(PutMappingResponse.class), Result.TYPE.getPreferredName());
GetMappingsResponse getMappingsResponse = mock(GetMappingsResponse.class);
@ -239,11 +241,12 @@ public class JobProviderTests extends ESTestCase {
public void testCreateJobRelatedIndicies_createsAliasBecauseIndexNameIsSet() {
String indexName = AnomalyDetectorsIndex.RESULTS_INDEX_PREFIX + "custom-bar";
String aliasName = AnomalyDetectorsIndex.jobResultsAliasedName("foo");
QueryBuilder jobFilter = QueryBuilders.termQuery("job_id", "foo");
MockClientBuilder clientBuilder = new MockClientBuilder(CLUSTER_NAME);
ArgumentCaptor<CreateIndexRequest> captor = ArgumentCaptor.forClass(CreateIndexRequest.class);
clientBuilder.createIndexRequest(indexName, captor);
clientBuilder.prepareAlias(indexName, aliasName);
clientBuilder.prepareAlias(indexName, aliasName, jobFilter);
clientBuilder.preparePutMapping(mock(PutMappingResponse.class), Result.TYPE.getPreferredName());
Job.Builder job = buildJobBuilder("foo");

View File

@ -281,9 +281,9 @@ public class MockClientBuilder {
}
@SuppressWarnings("unchecked")
public MockClientBuilder prepareAlias(String indexName, String alias) {
public MockClientBuilder prepareAlias(String indexName, String alias, QueryBuilder filter) {
IndicesAliasesRequestBuilder aliasesRequestBuilder = mock(IndicesAliasesRequestBuilder.class);
when(aliasesRequestBuilder.addAlias(eq(indexName), eq(alias))).thenReturn(aliasesRequestBuilder);
when(aliasesRequestBuilder.addAlias(eq(indexName), eq(alias), eq(filter))).thenReturn(aliasesRequestBuilder);
when(indicesAdminClient.prepareAliases()).thenReturn(aliasesRequestBuilder);
doAnswer(new Answer<Void>() {
@Override