diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 86b5cd93a08..0d8340ec722 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -48,16 +48,19 @@ import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.get.GetResult; import org.elasticsearch.index.mapper.MapperException; +import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; @@ -260,6 +263,17 @@ public class TransportShardBulkAction extends TransportWriteAction new ParameterizedMessage("{} mapping update rejected by primary", primary.shardId()), e); + onComplete(exceptionToResult(e, primary, isDelete, version), context, updateResult); + return true; + } + mappingUpdater.updateMappings(result.getRequiredMappingUpdate(), primary.shardId(), context.getRequestToExecute().type(), new ActionListener() { diff --git a/server/src/main/java/org/elasticsearch/index/mapper/MapperService.java b/server/src/main/java/org/elasticsearch/index/mapper/MapperService.java index 57d8f697090..5282faa7111 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/MapperService.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/MapperService.java @@ -85,6 +85,10 @@ public class MapperService extends AbstractIndexComponent implements Closeable { * The reason why a mapping is being merged. */ public enum MergeReason { + /** + * Pre-flight check before sending a mapping update to the master + */ + MAPPING_UPDATE_PREFLIGHT, /** * Create or update a mapping. */ @@ -341,6 +345,7 @@ public class MapperService extends AbstractIndexComponent implements Closeable { private synchronized Map internalMerge(IndexMetaData indexMetaData, MergeReason reason, boolean onlyUpdateIfNeeded) { + assert reason != MergeReason.MAPPING_UPDATE_PREFLIGHT; Map map = new LinkedHashMap<>(); for (ObjectCursor cursor : indexMetaData.getMappings().values()) { MappingMetaData mappingMetaData = cursor.value; @@ -494,7 +499,7 @@ public class MapperService extends AbstractIndexComponent implements Closeable { ContextMapping.validateContextPaths(indexSettings.getIndexVersionCreated(), fieldMappers, fieldTypes::get); - if (reason == MergeReason.MAPPING_UPDATE) { + if (reason == MergeReason.MAPPING_UPDATE || reason == MergeReason.MAPPING_UPDATE_PREFLIGHT) { // this check will only be performed on the master node when there is // a call to the update mapping API. For all other cases like // the master node restoring mappings from disk or data nodes @@ -509,7 +514,7 @@ public class MapperService extends AbstractIndexComponent implements Closeable { results.put(newMapper.type(), newMapper); } - if (reason == MergeReason.MAPPING_UPDATE) { + if (reason == MergeReason.MAPPING_UPDATE || reason == MergeReason.MAPPING_UPDATE_PREFLIGHT) { // this check will only be performed on the master node when there is // a call to the update mapping API. For all other cases like // the master node restoring mappings from disk or data nodes @@ -532,6 +537,10 @@ public class MapperService extends AbstractIndexComponent implements Closeable { // make structures immutable results = Collections.unmodifiableMap(results); + if (reason == MergeReason.MAPPING_UPDATE_PREFLIGHT) { + return results; + } + // only need to immutably rewrap these if the previous reference was changed. // if not then they are already implicitly immutable. if (fullPathObjectMappers != this.fullPathObjectMappers) { diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java index 8acb3e8cc93..46c16451ee5 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java @@ -44,8 +44,10 @@ import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.VersionConflictEngineException; +import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.Mapping; import org.elasticsearch.index.mapper.MetadataFieldMapper; +import org.elasticsearch.index.mapper.RootObjectMapper; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardTestCase; import org.elasticsearch.index.shard.ShardId; @@ -235,7 +237,7 @@ public class TransportShardBulkActionTests extends IndexShardTestCase { new BulkShardRequest(shardId, RefreshPolicy.NONE, items); Engine.IndexResult mappingUpdate = - new Engine.IndexResult(new Mapping(null, null, new MetadataFieldMapper[0], Collections.emptyMap())); + new Engine.IndexResult(new Mapping(null, mock(RootObjectMapper.class), new MetadataFieldMapper[0], Collections.emptyMap())); Translog.Location resultLocation = new Translog.Location(42, 42, 42); Engine.IndexResult success = new FakeIndexResult(1, 1, 13, true, resultLocation); @@ -243,6 +245,7 @@ public class TransportShardBulkActionTests extends IndexShardTestCase { when(shard.shardId()).thenReturn(shardId); when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean())) .thenReturn(mappingUpdate); + when(shard.mapperService()).thenReturn(mock(MapperService.class)); randomlySetIgnoredPrimaryResponse(items[0]); @@ -770,7 +773,7 @@ public class TransportShardBulkActionTests extends IndexShardTestCase { "I'm conflicted <(;_;)>"); Engine.IndexResult conflictedResult = new Engine.IndexResult(err, 0); Engine.IndexResult mappingUpdate = - new Engine.IndexResult(new Mapping(null, null, new MetadataFieldMapper[0], Collections.emptyMap())); + new Engine.IndexResult(new Mapping(null, mock(RootObjectMapper.class), new MetadataFieldMapper[0], Collections.emptyMap())); Translog.Location resultLocation = new Translog.Location(42, 42, 42); Engine.IndexResult success = new FakeIndexResult(1, 1, 13, true, resultLocation); @@ -787,6 +790,7 @@ public class TransportShardBulkActionTests extends IndexShardTestCase { }); when(shard.indexSettings()).thenReturn(indexSettings); when(shard.shardId()).thenReturn(shardId); + when(shard.mapperService()).thenReturn(mock(MapperService.class)); UpdateHelper updateHelper = mock(UpdateHelper.class); when(updateHelper.prepare(any(), eq(shard), any())).thenReturn( diff --git a/server/src/test/java/org/elasticsearch/index/mapper/DynamicMappingIT.java b/server/src/test/java/org/elasticsearch/index/mapper/DynamicMappingIT.java index e1f235c19c6..89cda3e1c21 100644 --- a/server/src/test/java/org/elasticsearch/index/mapper/DynamicMappingIT.java +++ b/server/src/test/java/org/elasticsearch/index/mapper/DynamicMappingIT.java @@ -21,8 +21,14 @@ package org.elasticsearch.index.mapper; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.InternalSettingsPlugin; @@ -35,6 +41,8 @@ import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; +import static org.elasticsearch.index.mapper.MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING; + public class DynamicMappingIT extends ESIntegTestCase { @Override @@ -119,4 +127,39 @@ public class DynamicMappingIT extends ESIntegTestCase { assertTrue(client().prepareGet("index", "type", Integer.toString(i)).get().isExists()); } } + + public void testPreflightCheckAvoidsMaster() throws InterruptedException { + createIndex("index", Settings.builder().put(INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey(), 2).build()); + ensureGreen("index"); + client().prepareIndex("index", MapperService.SINGLE_MAPPING_NAME).setId("1").setSource("field1", "value1").get(); + + final CountDownLatch masterBlockedLatch = new CountDownLatch(1); + final CountDownLatch indexingCompletedLatch = new CountDownLatch(1); + + internalCluster().getInstance(ClusterService.class, internalCluster().getMasterName()).submitStateUpdateTask("block-state-updates", + new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + masterBlockedLatch.countDown(); + indexingCompletedLatch.await(); + return currentState; + } + + @Override + public void onFailure(String source, Exception e) { + throw new AssertionError("unexpected", e); + } + }); + + masterBlockedLatch.await(); + final IndexRequestBuilder indexRequestBuilder + = client().prepareIndex("index", MapperService.SINGLE_MAPPING_NAME).setId("2").setSource("field2", "value2"); + try { + assertThat( + expectThrows(IllegalArgumentException.class, () -> indexRequestBuilder.get(TimeValue.timeValueSeconds(10))).getMessage(), + Matchers.containsString("Limit of total fields [2] in index [index] has been exceeded")); + } finally { + indexingCompletedLatch.countDown(); + } + } } diff --git a/server/src/test/java/org/elasticsearch/index/mapper/MapperServiceTests.java b/server/src/test/java/org/elasticsearch/index/mapper/MapperServiceTests.java index 4478f2f4640..b7ef993a5bb 100644 --- a/server/src/test/java/org/elasticsearch/index/mapper/MapperServiceTests.java +++ b/server/src/test/java/org/elasticsearch/index/mapper/MapperServiceTests.java @@ -57,6 +57,8 @@ import java.util.concurrent.ExecutionException; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; public class MapperServiceTests extends ESSingleNodeTestCase { @@ -127,6 +129,15 @@ public class MapperServiceTests extends ESSingleNodeTestCase { assertNull(indexService.mapperService().documentMapper(MapperService.DEFAULT_MAPPING)); } + public void testPreflightUpdateDoesNotChangeMapping() throws Throwable { + final MapperService mapperService = createIndex("test1").mapperService(); + final CompressedXContent mapping = createMappingSpecifyingNumberOfFields(1); + mapperService.merge("type", mapping, MergeReason.MAPPING_UPDATE_PREFLIGHT); + assertThat("field was not created by preflight check", mapperService.fullName("field0"), nullValue()); + mapperService.merge("type", mapping, MergeReason.MAPPING_UPDATE); + assertThat("field was not created by mapping update", mapperService.fullName("field0"), notNullValue()); + } + /** * Test that we can have at least the number of fields in new mappings that are defined by "index.mapping.total_fields.limit". * Any additional field should trigger an IllegalArgumentException. @@ -141,7 +152,7 @@ public class MapperServiceTests extends ESSingleNodeTestCase { // adding one more field should trigger exception IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> { createIndex("test2", settings).mapperService().merge("type", - createMappingSpecifyingNumberOfFields(totalFieldsLimit + 1), MergeReason.MAPPING_UPDATE); + createMappingSpecifyingNumberOfFields(totalFieldsLimit + 1), updateOrPreflight()); }); assertTrue(e.getMessage(), e.getMessage().contains("Limit of total fields [" + totalFieldsLimit + "] in index [test2] has been exceeded")); @@ -177,7 +188,7 @@ public class MapperServiceTests extends ESSingleNodeTestCase { indexService2.mapperService().merge("type", objectMapping, MergeReason.MAPPING_UPDATE); IllegalArgumentException e = expectThrows(IllegalArgumentException.class, - () -> indexService1.mapperService().merge("type", objectMapping, MergeReason.MAPPING_UPDATE)); + () -> indexService1.mapperService().merge("type", objectMapping, updateOrPreflight())); assertThat(e.getMessage(), containsString("Limit of mapping depth [1] in index [test1] has been exceeded")); } @@ -228,7 +239,7 @@ public class MapperServiceTests extends ESSingleNodeTestCase { .endObject().endObject())); invalidNestedException = expectThrows(IllegalArgumentException.class, () -> indexService.mapperService().merge("t", nestedFieldMapping, - MergeReason.MAPPING_UPDATE)); + updateOrPreflight())); assertThat(invalidNestedException.getMessage(), containsString("cannot have nested fields when index sort is activated")); } @@ -264,7 +275,7 @@ public class MapperServiceTests extends ESSingleNodeTestCase { .endObject())); IllegalArgumentException e = expectThrows(IllegalArgumentException.class, - () -> mapperService.merge("type", mappingUpdate, MergeReason.MAPPING_UPDATE)); + () -> mapperService.merge("type", mappingUpdate, updateOrPreflight())); assertThat(e.getMessage(), containsString("Invalid [path] value [nested.field] for field alias [alias]")); } @@ -292,7 +303,7 @@ public class MapperServiceTests extends ESSingleNodeTestCase { IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> { createIndex("test2", Settings.builder().put(MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey(), numberOfNonAliasFields).build()) - .mapperService().merge("type", new CompressedXContent(mapping), MergeReason.MAPPING_UPDATE); + .mapperService().merge("type", new CompressedXContent(mapping), updateOrPreflight()); }); assertEquals("Limit of total fields [" + numberOfNonAliasFields + "] in index [test2] has been exceeded", e.getMessage()); } @@ -334,7 +345,7 @@ public class MapperServiceTests extends ESSingleNodeTestCase { .endObject())); IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> { - mapperService.merge("type", mappingUpdate, MergeReason.MAPPING_UPDATE); + mapperService.merge("type", mappingUpdate, updateOrPreflight()); }); assertEquals("Field name [" + testString + "] in index [test1] is too long. " + @@ -359,7 +370,7 @@ public class MapperServiceTests extends ESSingleNodeTestCase { .endObject().endObject())); IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> { - mapperService.merge("type", mapping, MergeReason.MAPPING_UPDATE); + mapperService.merge("type", mapping, updateOrPreflight()); }); assertEquals("Field name [" + testString + "] in index [test1] is too long. " + @@ -388,7 +399,7 @@ public class MapperServiceTests extends ESSingleNodeTestCase { .endObject().endObject())); IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> { - mapperService.merge("type", mapping, MergeReason.MAPPING_UPDATE); + mapperService.merge("type", mapping, updateOrPreflight()); }); assertEquals("Field name [" + testString + "] in index [test1] is too long. " + @@ -479,6 +490,10 @@ public class MapperServiceTests extends ESSingleNodeTestCase { return true; } + private static MergeReason updateOrPreflight() { + return randomFrom(MergeReason.MAPPING_UPDATE, MergeReason.MAPPING_UPDATE_PREFLIGHT); + } + public static final class ReloadableFilterPlugin extends Plugin implements AnalysisPlugin { @Override