Update .ml-config mappings before indexing job, datafeed or df analytics config (#44216) (#44273)

This commit is contained in:
Przemysław Witek 2019-07-12 16:49:48 +02:00 committed by GitHub
parent 9b4f50b40a
commit dd5f4ae00e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 165 additions and 19 deletions

View File

@ -18,7 +18,7 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.AliasOrIndex;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.CheckedBiFunction;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.Index;
@ -140,9 +140,13 @@ public class ElasticsearchMappings {
}
public static XContentBuilder configMapping() throws IOException {
return configMapping(SINGLE_MAPPING_NAME);
}
public static XContentBuilder configMapping(String mappingType) throws IOException {
XContentBuilder builder = jsonBuilder();
builder.startObject();
builder.startObject(SINGLE_MAPPING_NAME);
builder.startObject(mappingType);
addMetaInformation(builder);
addDefaultMapping(builder);
builder.startObject(PROPERTIES);
@ -1146,7 +1150,7 @@ public class ElasticsearchMappings {
}
public static void addDocMappingIfMissing(String alias,
CheckedBiFunction<String, Collection<String>, XContentBuilder, IOException> mappingSupplier,
CheckedFunction<String, XContentBuilder, IOException> mappingSupplier,
Client client, ClusterState state, ActionListener<Boolean> listener) {
AliasOrIndex aliasOrIndex = state.metaData().getAliasAndIndexLookup().get(alias);
if (aliasOrIndex == null) {
@ -1170,7 +1174,7 @@ public class ElasticsearchMappings {
IndexMetaData indexMetaData = state.metaData().index(indicesThatRequireAnUpdate[0]);
String mappingType = indexMetaData.mapping().type();
try (XContentBuilder mapping = mappingSupplier.apply(mappingType, Collections.emptyList())) {
try (XContentBuilder mapping = mappingSupplier.apply(mappingType)) {
PutMappingRequest putMappingRequest = new PutMappingRequest(indicesThatRequireAnUpdate);
putMappingRequest.type(mappingType);
putMappingRequest.source(mapping);

View File

@ -10,6 +10,11 @@ import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingAction;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
@ -17,11 +22,13 @@ import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats;
import org.elasticsearch.xpack.core.ml.job.config.Job;
@ -35,6 +42,7 @@ import org.elasticsearch.xpack.core.ml.job.results.AnomalyRecord;
import org.elasticsearch.xpack.core.ml.job.results.CategoryDefinition;
import org.elasticsearch.xpack.core.ml.job.results.ReservedFieldNames;
import org.elasticsearch.xpack.core.ml.job.results.Result;
import org.mockito.ArgumentCaptor;
import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
@ -48,7 +56,16 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
public class ElasticsearchMappingsTests extends ESTestCase {
@ -207,6 +224,54 @@ public class ElasticsearchMappingsTests extends ESTestCase {
ElasticsearchMappings.mappingRequiresUpdate(cs, indices, VersionUtils.getPreviousMinorVersion()));
}
public void testAddDocMappingIfMissing() throws IOException {
ThreadPool threadPool = mock(ThreadPool.class);
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
Client client = mock(Client.class);
when(client.threadPool()).thenReturn(threadPool);
doAnswer(
invocationOnMock -> {
ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2];
listener.onResponse(new AcknowledgedResponse(true));
return null;
})
.when(client).execute(eq(PutMappingAction.INSTANCE), any(), any(ActionListener.class));
ClusterState clusterState = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("index-name", "0.0"));
ElasticsearchMappings.addDocMappingIfMissing(
"index-name",
ElasticsearchMappingsTests::fakeMapping,
client,
clusterState,
ActionListener.wrap(
ok -> assertTrue(ok),
e -> fail(e.toString())
)
);
ArgumentCaptor<PutMappingRequest> requestCaptor = ArgumentCaptor.forClass(PutMappingRequest.class);
verify(client).threadPool();
verify(client).execute(eq(PutMappingAction.INSTANCE), requestCaptor.capture(), any(ActionListener.class));
verifyNoMoreInteractions(client);
PutMappingRequest request = requestCaptor.getValue();
assertThat(request.type(), equalTo("_doc"));
assertThat(request.indices(), equalTo(new String[] { "index-name" }));
assertThat(request.source(), equalTo("{\"_doc\":{\"properties\":{\"some-field\":{\"type\":\"long\"}}}}"));
}
private static XContentBuilder fakeMapping(String mappingType) throws IOException {
return jsonBuilder()
.startObject()
.startObject(mappingType)
.startObject(ElasticsearchMappings.PROPERTIES)
.startObject("some-field")
.field(ElasticsearchMappings.TYPE, ElasticsearchMappings.LONG)
.endObject()
.endObject()
.endObject()
.endObject();
}
private ClusterState getClusterStateWithMappingsWithMetaData(Map<String, Object> namesAndVersions) throws IOException {
MetaData.Builder metaDataBuilder = MetaData.builder();

View File

@ -5,11 +5,15 @@
*/
package org.elasticsearch.xpack.ml.action;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
@ -29,6 +33,8 @@ import org.elasticsearch.xpack.core.ml.MachineLearningField;
import org.elasticsearch.xpack.core.ml.action.PutDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.ml.utils.MlStrings;
import org.elasticsearch.xpack.core.security.SecurityContext;
@ -43,12 +49,15 @@ import org.elasticsearch.xpack.ml.dataframe.persistence.DataFrameAnalyticsConfig
import java.io.IOException;
import java.time.Instant;
import java.util.Map;
import java.util.Objects;
import java.util.function.Supplier;
public class TransportPutDataFrameAnalyticsAction
extends HandledTransportAction<PutDataFrameAnalyticsAction.Request, PutDataFrameAnalyticsAction.Response> {
private static final Logger logger = LogManager.getLogger(TransportPutDataFrameAnalyticsAction.class);
private final XPackLicenseState licenseState;
private final DataFrameAnalyticsConfigProvider configProvider;
private final ThreadPool threadPool;
@ -97,6 +106,7 @@ public class TransportPutDataFrameAnalyticsAction
.setCreateTime(Instant.now())
.setVersion(Version.CURRENT)
.build();
if (licenseState.isAuthAllowed()) {
final String username = securityContext.getUser().principal();
RoleDescriptor.IndicesPrivileges sourceIndexPrivileges = RoleDescriptor.IndicesPrivileges.builder()
@ -120,7 +130,10 @@ public class TransportPutDataFrameAnalyticsAction
client.execute(HasPrivilegesAction.INSTANCE, privRequest, privResponseListener);
} else {
configProvider.put(memoryCappedConfig, threadPool.getThreadContext().getHeaders(), ActionListener.wrap(
updateDocMappingAndPutConfig(
memoryCappedConfig,
threadPool.getThreadContext().getHeaders(),
ActionListener.wrap(
indexResponse -> listener.onResponse(new PutDataFrameAnalyticsAction.Response(memoryCappedConfig)),
listener::onFailure
));
@ -131,7 +144,10 @@ public class TransportPutDataFrameAnalyticsAction
HasPrivilegesResponse response,
ActionListener<PutDataFrameAnalyticsAction.Response> listener) throws IOException {
if (response.isCompleteMatch()) {
configProvider.put(memoryCappedConfig, threadPool.getThreadContext().getHeaders(), ActionListener.wrap(
updateDocMappingAndPutConfig(
memoryCappedConfig,
threadPool.getThreadContext().getHeaders(),
ActionListener.wrap(
indexResponse -> listener.onResponse(new PutDataFrameAnalyticsAction.Response(memoryCappedConfig)),
listener::onFailure
));
@ -150,6 +166,25 @@ public class TransportPutDataFrameAnalyticsAction
}
}
private void updateDocMappingAndPutConfig(DataFrameAnalyticsConfig config,
Map<String, String> headers,
ActionListener<IndexResponse> listener) {
ClusterState clusterState = clusterService.state();
if (clusterState == null) {
logger.warn("Cannot update doc mapping because clusterState == null");
configProvider.put(config, headers, listener);
return;
}
ElasticsearchMappings.addDocMappingIfMissing(
AnomalyDetectorsIndex.configIndexName(),
ElasticsearchMappings::configMapping,
client,
clusterState,
ActionListener.wrap(
unused -> configProvider.put(config, headers, listener),
listener::onFailure));
}
private void validateConfig(DataFrameAnalyticsConfig config) {
if (MlStrings.isValidId(config.getId()) == false) {
throw ExceptionsHelper.badRequestException(Messages.getMessage(Messages.INVALID_ID, DataFrameAnalyticsConfig.ID,

View File

@ -5,6 +5,8 @@
*/
package org.elasticsearch.xpack.ml.action;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchAction;
@ -36,6 +38,8 @@ import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.action.PutDatafeedAction;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.rollup.action.GetRollupIndexCapsAction;
import org.elasticsearch.xpack.core.rollup.action.RollupSearchAction;
@ -58,6 +62,8 @@ import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
public class TransportPutDatafeedAction extends TransportMasterNodeAction<PutDatafeedAction.Request, PutDatafeedAction.Response> {
private static final Logger logger = LogManager.getLogger(TransportPutDatafeedAction.class);
private final XPackLicenseState licenseState;
private final Client client;
private final SecurityContext securityContext;
@ -111,7 +117,7 @@ public class TransportPutDatafeedAction extends TransportMasterNodeAction<PutDat
.indices(indices);
ActionListener<HasPrivilegesResponse> privResponseListener = ActionListener.wrap(
r -> handlePrivsResponse(username, request, r, listener),
r -> handlePrivsResponse(username, request, r, state, listener),
listener::onFailure);
ActionListener<GetRollupIndexCapsAction.Response> getRollupIndexCapsActionHandler = ActionListener.wrap(
@ -145,15 +151,17 @@ public class TransportPutDatafeedAction extends TransportMasterNodeAction<PutDat
}
} else {
putDatafeed(request, threadPool.getThreadContext().getHeaders(), listener);
putDatafeed(request, threadPool.getThreadContext().getHeaders(), state, listener);
}
}
private void handlePrivsResponse(String username, PutDatafeedAction.Request request,
private void handlePrivsResponse(String username,
PutDatafeedAction.Request request,
HasPrivilegesResponse response,
ClusterState clusterState,
ActionListener<PutDatafeedAction.Response> listener) throws IOException {
if (response.isCompleteMatch()) {
putDatafeed(request, threadPool.getThreadContext().getHeaders(), listener);
putDatafeed(request, threadPool.getThreadContext().getHeaders(), clusterState, listener);
} else {
XContentBuilder builder = JsonXContent.contentBuilder();
builder.startObject();
@ -169,7 +177,9 @@ public class TransportPutDatafeedAction extends TransportMasterNodeAction<PutDat
}
}
private void putDatafeed(PutDatafeedAction.Request request, Map<String, String> headers,
private void putDatafeed(PutDatafeedAction.Request request,
Map<String, String> headers,
ClusterState clusterState,
ActionListener<PutDatafeedAction.Response> listener) {
String datafeedId = request.getDatafeed().getId();
@ -181,13 +191,30 @@ public class TransportPutDatafeedAction extends TransportMasterNodeAction<PutDat
}
DatafeedConfig.validateAggregations(request.getDatafeed().getParsedAggregations(xContentRegistry));
CheckedConsumer<Boolean, Exception> validationOk = ok -> {
datafeedConfigProvider.putDatafeedConfig(request.getDatafeed(), headers, ActionListener.wrap(
CheckedConsumer<Boolean, Exception> mappingsUpdated = ok -> {
datafeedConfigProvider.putDatafeedConfig(
request.getDatafeed(),
headers,
ActionListener.wrap(
indexResponse -> listener.onResponse(new PutDatafeedAction.Response(request.getDatafeed())),
listener::onFailure
));
};
CheckedConsumer<Boolean, Exception> validationOk = ok -> {
if (clusterState == null) {
logger.warn("Cannot update doc mapping because clusterState == null");
mappingsUpdated.accept(false);
return;
}
ElasticsearchMappings.addDocMappingIfMissing(
AnomalyDetectorsIndex.configIndexName(),
ElasticsearchMappings::configMapping,
client,
clusterState,
ActionListener.wrap(mappingsUpdated, listener::onFailure));
};
CheckedConsumer<Boolean, Exception> jobOk = ok ->
jobConfigProvider.validateDatafeedJob(request.getDatafeed(), ActionListener.wrap(validationOk, listener::onFailure));

View File

@ -45,6 +45,8 @@ import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.core.ml.job.config.MlFilter;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
@ -256,7 +258,7 @@ public class JobManager {
ActionListener<Boolean> putJobListener = new ActionListener<Boolean>() {
@Override
public void onResponse(Boolean indicesCreated) {
public void onResponse(Boolean mappingsUpdated) {
jobConfigProvider.putJob(job, ActionListener.wrap(
response -> {
@ -283,10 +285,23 @@ public class JobManager {
}
};
ActionListener<Boolean> addDocMappingsListener = ActionListener.wrap(
indicesCreated -> {
if (state == null) {
logger.warn("Cannot update doc mapping because clusterState == null");
putJobListener.onResponse(false);
return;
}
ElasticsearchMappings.addDocMappingIfMissing(
AnomalyDetectorsIndex.configIndexName(), ElasticsearchMappings::configMapping, client, state, putJobListener);
},
putJobListener::onFailure
);
ActionListener<List<String>> checkForLeftOverDocs = ActionListener.wrap(
matchedIds -> {
if (matchedIds.isEmpty()) {
jobResultsProvider.createJobResultIndex(job, state, putJobListener);
jobResultsProvider.createJobResultIndex(job, state, addDocMappingsListener);
} else {
// A job has the same Id as one of the group names
// error with the first in the list