Add preflight check to dynamic mapping updates (#48867)

Today if the primary discovers that an indexing request needs a mapping update
then it will send it to the master for validation and processing. If, however,
the put-mapping request is invalid then the master still processes it as a
(no-op) cluster state update. When there are a large number of indexing
operations that result in invalid mapping updates this can overwhelm the
master.

However, the primary already has a reasonably up-to-date mapping against which
it can check the (approximate) validity of the put-mapping request before
sending it to the master. For instance it is not possible to remove fields in a
mapping update, so if the primary detects that a mapping update will exceed the
fields limit then it can reject it itself and avoid bothering the master.

This commit adds a pre-flight check to the mapping update path so that the
primary can discard obviously-invalid put-mapping requests itself.

Fixes #35564
Backport of #48817
This commit is contained in:
David Turner 2019-11-05 18:08:22 +01:00 committed by GitHub
parent 24f7d4e83b
commit bd5c6c4779
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 97 additions and 12 deletions

View File

@ -48,16 +48,19 @@ import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.mapper.MapperException;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexShard;
@ -260,6 +263,17 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
request.ifSeqNo(), request.ifPrimaryTerm(), request.getAutoGeneratedTimestamp(), request.isRetry());
}
if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {
try {
primary.mapperService().merge(context.getRequestToExecute().type(),
new CompressedXContent(result.getRequiredMappingUpdate(), XContentType.JSON, ToXContent.EMPTY_PARAMS),
MapperService.MergeReason.MAPPING_UPDATE_PREFLIGHT);
} catch (Exception e) {
logger.info(() -> new ParameterizedMessage("{} mapping update rejected by primary", primary.shardId()), e);
onComplete(exceptionToResult(e, primary, isDelete, version), context, updateResult);
return true;
}
mappingUpdater.updateMappings(result.getRequiredMappingUpdate(), primary.shardId(),
context.getRequestToExecute().type(),
new ActionListener<Void>() {

View File

@ -85,6 +85,10 @@ public class MapperService extends AbstractIndexComponent implements Closeable {
* The reason why a mapping is being merged.
*/
public enum MergeReason {
/**
* Pre-flight check before sending a mapping update to the master
*/
MAPPING_UPDATE_PREFLIGHT,
/**
* Create or update a mapping.
*/
@ -341,6 +345,7 @@ public class MapperService extends AbstractIndexComponent implements Closeable {
private synchronized Map<String, DocumentMapper> internalMerge(IndexMetaData indexMetaData,
MergeReason reason, boolean onlyUpdateIfNeeded) {
assert reason != MergeReason.MAPPING_UPDATE_PREFLIGHT;
Map<String, CompressedXContent> map = new LinkedHashMap<>();
for (ObjectCursor<MappingMetaData> cursor : indexMetaData.getMappings().values()) {
MappingMetaData mappingMetaData = cursor.value;
@ -494,7 +499,7 @@ public class MapperService extends AbstractIndexComponent implements Closeable {
ContextMapping.validateContextPaths(indexSettings.getIndexVersionCreated(), fieldMappers, fieldTypes::get);
if (reason == MergeReason.MAPPING_UPDATE) {
if (reason == MergeReason.MAPPING_UPDATE || reason == MergeReason.MAPPING_UPDATE_PREFLIGHT) {
// this check will only be performed on the master node when there is
// a call to the update mapping API. For all other cases like
// the master node restoring mappings from disk or data nodes
@ -509,7 +514,7 @@ public class MapperService extends AbstractIndexComponent implements Closeable {
results.put(newMapper.type(), newMapper);
}
if (reason == MergeReason.MAPPING_UPDATE) {
if (reason == MergeReason.MAPPING_UPDATE || reason == MergeReason.MAPPING_UPDATE_PREFLIGHT) {
// this check will only be performed on the master node when there is
// a call to the update mapping API. For all other cases like
// the master node restoring mappings from disk or data nodes
@ -532,6 +537,10 @@ public class MapperService extends AbstractIndexComponent implements Closeable {
// make structures immutable
results = Collections.unmodifiableMap(results);
if (reason == MergeReason.MAPPING_UPDATE_PREFLIGHT) {
return results;
}
// only need to immutably rewrap these if the previous reference was changed.
// if not then they are already implicitly immutable.
if (fullPathObjectMappers != this.fullPathObjectMappers) {

View File

@ -44,8 +44,10 @@ import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.MetadataFieldMapper;
import org.elasticsearch.index.mapper.RootObjectMapper;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTestCase;
import org.elasticsearch.index.shard.ShardId;
@ -235,7 +237,7 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
new BulkShardRequest(shardId, RefreshPolicy.NONE, items);
Engine.IndexResult mappingUpdate =
new Engine.IndexResult(new Mapping(null, null, new MetadataFieldMapper[0], Collections.emptyMap()));
new Engine.IndexResult(new Mapping(null, mock(RootObjectMapper.class), new MetadataFieldMapper[0], Collections.emptyMap()));
Translog.Location resultLocation = new Translog.Location(42, 42, 42);
Engine.IndexResult success = new FakeIndexResult(1, 1, 13, true, resultLocation);
@ -243,6 +245,7 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
when(shard.shardId()).thenReturn(shardId);
when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean()))
.thenReturn(mappingUpdate);
when(shard.mapperService()).thenReturn(mock(MapperService.class));
randomlySetIgnoredPrimaryResponse(items[0]);
@ -770,7 +773,7 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
"I'm conflicted <(;_;)>");
Engine.IndexResult conflictedResult = new Engine.IndexResult(err, 0);
Engine.IndexResult mappingUpdate =
new Engine.IndexResult(new Mapping(null, null, new MetadataFieldMapper[0], Collections.emptyMap()));
new Engine.IndexResult(new Mapping(null, mock(RootObjectMapper.class), new MetadataFieldMapper[0], Collections.emptyMap()));
Translog.Location resultLocation = new Translog.Location(42, 42, 42);
Engine.IndexResult success = new FakeIndexResult(1, 1, 13, true, resultLocation);
@ -787,6 +790,7 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
});
when(shard.indexSettings()).thenReturn(indexSettings);
when(shard.shardId()).thenReturn(shardId);
when(shard.mapperService()).thenReturn(mock(MapperService.class));
UpdateHelper updateHelper = mock(UpdateHelper.class);
when(updateHelper.prepare(any(), eq(shard), any())).thenReturn(

View File

@ -21,8 +21,14 @@ package org.elasticsearch.index.mapper;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalSettingsPlugin;
@ -35,6 +41,8 @@ import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.index.mapper.MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING;
public class DynamicMappingIT extends ESIntegTestCase {
@Override
@ -119,4 +127,39 @@ public class DynamicMappingIT extends ESIntegTestCase {
assertTrue(client().prepareGet("index", "type", Integer.toString(i)).get().isExists());
}
}
public void testPreflightCheckAvoidsMaster() throws InterruptedException {
createIndex("index", Settings.builder().put(INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey(), 2).build());
ensureGreen("index");
client().prepareIndex("index", MapperService.SINGLE_MAPPING_NAME).setId("1").setSource("field1", "value1").get();
final CountDownLatch masterBlockedLatch = new CountDownLatch(1);
final CountDownLatch indexingCompletedLatch = new CountDownLatch(1);
internalCluster().getInstance(ClusterService.class, internalCluster().getMasterName()).submitStateUpdateTask("block-state-updates",
new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
masterBlockedLatch.countDown();
indexingCompletedLatch.await();
return currentState;
}
@Override
public void onFailure(String source, Exception e) {
throw new AssertionError("unexpected", e);
}
});
masterBlockedLatch.await();
final IndexRequestBuilder indexRequestBuilder
= client().prepareIndex("index", MapperService.SINGLE_MAPPING_NAME).setId("2").setSource("field2", "value2");
try {
assertThat(
expectThrows(IllegalArgumentException.class, () -> indexRequestBuilder.get(TimeValue.timeValueSeconds(10))).getMessage(),
Matchers.containsString("Limit of total fields [2] in index [index] has been exceeded"));
} finally {
indexingCompletedLatch.countDown();
}
}
}

View File

@ -57,6 +57,8 @@ import java.util.concurrent.ExecutionException;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
public class MapperServiceTests extends ESSingleNodeTestCase {
@ -127,6 +129,15 @@ public class MapperServiceTests extends ESSingleNodeTestCase {
assertNull(indexService.mapperService().documentMapper(MapperService.DEFAULT_MAPPING));
}
public void testPreflightUpdateDoesNotChangeMapping() throws Throwable {
final MapperService mapperService = createIndex("test1").mapperService();
final CompressedXContent mapping = createMappingSpecifyingNumberOfFields(1);
mapperService.merge("type", mapping, MergeReason.MAPPING_UPDATE_PREFLIGHT);
assertThat("field was not created by preflight check", mapperService.fullName("field0"), nullValue());
mapperService.merge("type", mapping, MergeReason.MAPPING_UPDATE);
assertThat("field was not created by mapping update", mapperService.fullName("field0"), notNullValue());
}
/**
* Test that we can have at least the number of fields in new mappings that are defined by "index.mapping.total_fields.limit".
* Any additional field should trigger an IllegalArgumentException.
@ -141,7 +152,7 @@ public class MapperServiceTests extends ESSingleNodeTestCase {
// adding one more field should trigger exception
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> {
createIndex("test2", settings).mapperService().merge("type",
createMappingSpecifyingNumberOfFields(totalFieldsLimit + 1), MergeReason.MAPPING_UPDATE);
createMappingSpecifyingNumberOfFields(totalFieldsLimit + 1), updateOrPreflight());
});
assertTrue(e.getMessage(),
e.getMessage().contains("Limit of total fields [" + totalFieldsLimit + "] in index [test2] has been exceeded"));
@ -177,7 +188,7 @@ public class MapperServiceTests extends ESSingleNodeTestCase {
indexService2.mapperService().merge("type", objectMapping, MergeReason.MAPPING_UPDATE);
IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
() -> indexService1.mapperService().merge("type", objectMapping, MergeReason.MAPPING_UPDATE));
() -> indexService1.mapperService().merge("type", objectMapping, updateOrPreflight()));
assertThat(e.getMessage(), containsString("Limit of mapping depth [1] in index [test1] has been exceeded"));
}
@ -228,7 +239,7 @@ public class MapperServiceTests extends ESSingleNodeTestCase {
.endObject().endObject()));
invalidNestedException = expectThrows(IllegalArgumentException.class,
() -> indexService.mapperService().merge("t", nestedFieldMapping,
MergeReason.MAPPING_UPDATE));
updateOrPreflight()));
assertThat(invalidNestedException.getMessage(),
containsString("cannot have nested fields when index sort is activated"));
}
@ -264,7 +275,7 @@ public class MapperServiceTests extends ESSingleNodeTestCase {
.endObject()));
IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
() -> mapperService.merge("type", mappingUpdate, MergeReason.MAPPING_UPDATE));
() -> mapperService.merge("type", mappingUpdate, updateOrPreflight()));
assertThat(e.getMessage(), containsString("Invalid [path] value [nested.field] for field alias [alias]"));
}
@ -292,7 +303,7 @@ public class MapperServiceTests extends ESSingleNodeTestCase {
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> {
createIndex("test2",
Settings.builder().put(MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey(), numberOfNonAliasFields).build())
.mapperService().merge("type", new CompressedXContent(mapping), MergeReason.MAPPING_UPDATE);
.mapperService().merge("type", new CompressedXContent(mapping), updateOrPreflight());
});
assertEquals("Limit of total fields [" + numberOfNonAliasFields + "] in index [test2] has been exceeded", e.getMessage());
}
@ -334,7 +345,7 @@ public class MapperServiceTests extends ESSingleNodeTestCase {
.endObject()));
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> {
mapperService.merge("type", mappingUpdate, MergeReason.MAPPING_UPDATE);
mapperService.merge("type", mappingUpdate, updateOrPreflight());
});
assertEquals("Field name [" + testString + "] in index [test1] is too long. " +
@ -359,7 +370,7 @@ public class MapperServiceTests extends ESSingleNodeTestCase {
.endObject().endObject()));
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> {
mapperService.merge("type", mapping, MergeReason.MAPPING_UPDATE);
mapperService.merge("type", mapping, updateOrPreflight());
});
assertEquals("Field name [" + testString + "] in index [test1] is too long. " +
@ -388,7 +399,7 @@ public class MapperServiceTests extends ESSingleNodeTestCase {
.endObject().endObject()));
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> {
mapperService.merge("type", mapping, MergeReason.MAPPING_UPDATE);
mapperService.merge("type", mapping, updateOrPreflight());
});
assertEquals("Field name [" + testString + "] in index [test1] is too long. " +
@ -479,6 +490,10 @@ public class MapperServiceTests extends ESSingleNodeTestCase {
return true;
}
private static MergeReason updateOrPreflight() {
return randomFrom(MergeReason.MAPPING_UPDATE, MergeReason.MAPPING_UPDATE_PREFLIGHT);
}
public static final class ReloadableFilterPlugin extends Plugin implements AnalysisPlugin {
@Override