Only re-parse operation if a mapping update was needed

When executing an index operation on the primary shard,
`TransportShardBulkAction` first parses the document, sees if there are any
mapping updates that needs to be applied, and then updates the mapping on the
master node. It then re-parses the document to make sure that the mappings have
been applied and propagated.

This adds a check that skips the second parsing of the document in the event
there was not a mapping update applied in the first case.

Fixes a performance regression introduced in #23665
This commit is contained in:
Lee Hinman 2017-03-30 10:43:03 -06:00
parent d5d0f140d6
commit 0257a7b97a
8 changed files with 119 additions and 39 deletions

View File

@ -21,6 +21,7 @@ package org.elasticsearch.action.bulk;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
@ -29,13 +30,9 @@ import java.util.Objects;
public interface MappingUpdatePerformer {
/**
* Determine if any mappings need to be updated, and update them on the master node if
* necessary. Returnes a failure Exception in the event updating the mappings fails or null if
* successful.
* Update the mappings on the master.
*/
void updateMappingsIfNeeded(Engine.Index operation,
ShardId shardId,
String type) throws Exception;
void updateMappings(Mapping update, ShardId shardId, String type) throws Exception;
/**
* Throws a {@code ReplicationOperation.RetryOnPrimaryException} if the operation needs to be

View File

@ -485,9 +485,16 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
public static Engine.IndexResult executeIndexRequestOnPrimary(IndexRequest request, IndexShard primary,
MappingUpdatePerformer mappingUpdater) throws Exception {
// Update the mappings if parsing the documents includes new dynamic updates
final Engine.Index preUpdateOperation;
final Mapping mappingUpdate;
final boolean mappingUpdateNeeded;
try {
final Engine.Index preUpdateOperation = prepareIndexOperationOnPrimary(request, primary);
mappingUpdater.updateMappingsIfNeeded(preUpdateOperation, primary.shardId(), request.type());
preUpdateOperation = prepareIndexOperationOnPrimary(request, primary);
mappingUpdate = preUpdateOperation.parsedDoc().dynamicMappingsUpdate();
mappingUpdateNeeded = mappingUpdate != null;
if (mappingUpdateNeeded) {
mappingUpdater.updateMappings(mappingUpdate, primary.shardId(), request.type());
}
} catch (MapperParsingException | IllegalArgumentException failure) {
return new Engine.IndexResult(failure, request.version());
}
@ -495,13 +502,18 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
// Verify that there are no more mappings that need to be applied. If there are failures, a
// ReplicationOperation.RetryOnPrimaryException is thrown.
final Engine.Index operation;
try {
operation = prepareIndexOperationOnPrimary(request, primary);
mappingUpdater.verifyMappings(operation, primary.shardId());
} catch (MapperParsingException | IllegalStateException e) {
// there was an error in parsing the document that was not because
// of pending mapping updates, so return a failure for the result
return new Engine.IndexResult(e, request.version());
if (mappingUpdateNeeded) {
try {
operation = prepareIndexOperationOnPrimary(request, primary);
mappingUpdater.verifyMappings(operation, primary.shardId());
} catch (MapperParsingException | IllegalStateException e) {
// there was an error in parsing the document that was not because
// of pending mapping updates, so return a failure for the result
return new Engine.IndexResult(e, request.version());
}
} else {
// There was no mapping update, the operation is the same as the pre-update version.
operation = preUpdateOperation;
}
return primary.index(operation);
@ -523,9 +535,8 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
class ConcreteMappingUpdatePerformer implements MappingUpdatePerformer {
public void updateMappingsIfNeeded(final Engine.Index operation, final ShardId shardId,
final String type) throws Exception {
final Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
public void updateMappings(final Mapping update, final ShardId shardId,
final String type) throws Exception {
if (update != null) {
// can throw timeout exception when updating mappings or ISE for attempting to
// update default mappings which are bubbled up

View File

@ -45,6 +45,7 @@ import org.elasticsearch.index.Index;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTestCase;
@ -57,9 +58,18 @@ import org.elasticsearch.action.bulk.MappingUpdatePerformer;
import org.elasticsearch.action.bulk.BulkItemResultHolder;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.Matchers.containsString;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyBoolean;
import static org.mockito.Mockito.anyLong;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
public class TransportShardBulkActionTests extends IndexShardTestCase {
@ -72,7 +82,9 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
private IndexMetaData indexMetaData() throws IOException {
return IndexMetaData.builder("index")
.putMapping("type", "{\"properties\": {\"foo\": {\"type\": \"text\"}}}")
.putMapping("type",
"{\"properties\":{\"foo\":{\"type\":\"text\",\"fields\":" +
"{\"keyword\":{\"type\":\"keyword\",\"ignore_above\":256}}}}}")
.settings(idxSettings)
.primaryTerm(0, 1).build();
}
@ -93,7 +105,7 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
request = new BulkItemRequest(0, writeRequest);
request.setPrimaryResponse(
new BulkItemResponse(0, DocWriteRequest.OpType.INDEX,
new BulkItemResponse.Failure("test", "type", "id",
new BulkItemResponse.Failure("index", "type", "id",
new IllegalArgumentException("i died"))));
assertFalse(TransportShardBulkAction.shouldExecuteReplicaItem(request, 0));
@ -503,6 +515,70 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
}
public void testMappingUpdateParsesCorrectNumberOfTimes() throws Exception {
IndexMetaData metaData = indexMetaData();
logger.info("--> metadata.getIndex(): {}", metaData.getIndex());
final IndexShard shard = spy(newStartedShard(true));
IndexRequest request = new IndexRequest("index", "type", "id")
.source(Requests.INDEX_CONTENT_TYPE, "foo", "bar");
final AtomicInteger updateCalled = new AtomicInteger(0);
final AtomicInteger verifyCalled = new AtomicInteger(0);
TransportShardBulkAction.executeIndexRequestOnPrimary(request, shard,
new MappingUpdatePerformer() {
@Override
public void updateMappings(Mapping update, ShardId shardId,
String type) throws Exception {
// There should indeed be a mapping update
assertNotNull(update);
updateCalled.incrementAndGet();
}
@Override
public void verifyMappings(Engine.Index operation,
ShardId shardId) throws Exception {
// No-op, will be called
logger.info("--> verifying mappings noop");
verifyCalled.incrementAndGet();
}
});
assertThat("mappings were \"updated\" once", updateCalled.get(), equalTo(1));
assertThat("mappings were \"verified\" once", verifyCalled.get(), equalTo(1));
// Verify that the shard "prepared" the operation twice
verify(shard, times(2)).prepareIndexOnPrimary(any(), anyLong(), any(),
anyLong(), anyBoolean());
// Update the mapping, so the next mapping updater doesn't do anything
final MapperService mapperService = shard.mapperService();
logger.info("--> mapperService.index(): {}", mapperService.index());
mapperService.updateMapping(metaData);
TransportShardBulkAction.executeIndexRequestOnPrimary(request, shard,
new MappingUpdatePerformer() {
@Override
public void updateMappings(Mapping update, ShardId shardId,
String type) throws Exception {
fail("should not have had to update the mappings");
}
@Override
public void verifyMappings(Engine.Index operation,
ShardId shardId) throws Exception {
fail("should not have had to update the mappings");
}
});
// Verify that the shard "prepared" the operation only once (2 for previous invocations plus
// 1 for this execution)
verify(shard, times(3)).prepareIndexOnPrimary(any(), anyLong(), any(),
anyLong(), anyBoolean());
closeShards(shard);
}
public class IndexResultWithLocation extends Engine.IndexResult {
private final Translog.Location location;
public IndexResultWithLocation(long version, long seqNo, boolean created,
@ -557,9 +633,7 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
/** Doesn't perform any mapping updates */
public static class NoopMappingUpdatePerformer implements MappingUpdatePerformer {
public void updateMappingsIfNeeded(Engine.Index operation,
ShardId shardId,
String type) throws Exception {
public void updateMappings(Mapping update, ShardId shardId, String type) throws Exception {
}
public void verifyMappings(Engine.Index operation, ShardId shardId) throws Exception {
@ -573,9 +647,7 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
this.e = e;
}
public void updateMappingsIfNeeded(Engine.Index operation,
ShardId shardId,
String type) throws Exception {
public void updateMappings(Mapping update, ShardId shardId, String type) throws Exception {
throw e;
}
@ -591,9 +663,7 @@ public class TransportShardBulkActionTests extends IndexShardTestCase {
this.e = e;
}
public void updateMappingsIfNeeded(Engine.Index operation,
ShardId shardId,
String type) throws Exception {
public void updateMappings(Mapping update, ShardId shardId, String type) throws Exception {
}
public void verifyMappings(Engine.Index operation, ShardId shardId) throws Exception {

View File

@ -49,7 +49,7 @@ public class MapperTests extends ESTestCase {
XContentBuilder mapping = createMappingWithIncludeInAll();
Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build();
final MapperService currentMapperService = MapperTestUtils.newMapperService(xContentRegistry(), createTempDir(), settings);
final MapperService currentMapperService = MapperTestUtils.newMapperService(xContentRegistry(), createTempDir(), settings, "test");
Exception e = expectThrows(MapperParsingException.class, () ->
currentMapperService.parse("type", new CompressedXContent(mapping.string()), true));
assertEquals("[include_in_all] is not allowed for indices created on or after version 6.0.0 as [_all] is deprecated. " +
@ -59,7 +59,7 @@ public class MapperTests extends ESTestCase {
settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.V_5_3_0_UNRELEASED).build();
// Create the mapping service with an older index creation version
final MapperService oldMapperService = MapperTestUtils.newMapperService(xContentRegistry(), createTempDir(), settings);
final MapperService oldMapperService = MapperTestUtils.newMapperService(xContentRegistry(), createTempDir(), settings, "test");
// Should not throw an exception now
oldMapperService.parse("type", new CompressedXContent(mapping.string()), true);
}

View File

@ -37,7 +37,7 @@ public class MultiFieldCopyToMapperTests extends ESTestCase {
XContentBuilder mapping = createMappinmgWithCopyToInMultiField();
// first check that for newer versions we throw exception if copy_to is found withing multi field
MapperService mapperService = MapperTestUtils.newMapperService(xContentRegistry(), createTempDir(), Settings.EMPTY);
MapperService mapperService = MapperTestUtils.newMapperService(xContentRegistry(), createTempDir(), Settings.EMPTY, "test");
try {
mapperService.parse("type", new CompressedXContent(mapping.string()), true);
fail("Parsing should throw an exception because the mapping contains a copy_to in a multi field");

View File

@ -34,7 +34,7 @@ public class MultiFieldIncludeInAllMapperTests extends ESTestCase {
XContentBuilder mapping = createMappingWithIncludeInAllInMultiField();
// first check that for newer versions we throw exception if include_in_all is found withing multi field
MapperService mapperService = MapperTestUtils.newMapperService(xContentRegistry(), createTempDir(), Settings.EMPTY);
MapperService mapperService = MapperTestUtils.newMapperService(xContentRegistry(), createTempDir(), Settings.EMPTY, "test");
Exception e = expectThrows(MapperParsingException.class, () ->
mapperService.parse("type", new CompressedXContent(mapping.string()), true));
assertEquals("include_in_all in multi fields is not allowed. Found the include_in_all in field [c] which is within a multi field.",

View File

@ -40,14 +40,16 @@ import static org.elasticsearch.test.ESTestCase.createTestAnalysis;
public class MapperTestUtils {
public static MapperService newMapperService(NamedXContentRegistry xContentRegistry, Path tempDir, Settings indexSettings)
throws IOException {
public static MapperService newMapperService(NamedXContentRegistry xContentRegistry,
Path tempDir,
Settings indexSettings,
String indexName) throws IOException {
IndicesModule indicesModule = new IndicesModule(Collections.emptyList());
return newMapperService(xContentRegistry, tempDir, indexSettings, indicesModule);
return newMapperService(xContentRegistry, tempDir, indexSettings, indicesModule, indexName);
}
public static MapperService newMapperService(NamedXContentRegistry xContentRegistry, Path tempDir, Settings settings,
IndicesModule indicesModule) throws IOException {
IndicesModule indicesModule, String indexName) throws IOException {
Settings.Builder settingsBuilder = Settings.builder()
.put(Environment.PATH_HOME_SETTING.getKey(), tempDir)
.put(settings);
@ -56,7 +58,7 @@ public class MapperTestUtils {
}
Settings finalSettings = settingsBuilder.build();
MapperRegistry mapperRegistry = indicesModule.getMapperRegistry();
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test", finalSettings);
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexName, finalSettings);
IndexAnalyzers indexAnalyzers = createTestAnalysis(indexSettings, finalSettings).indexAnalyzers;
SimilarityService similarityService = new SimilarityService(indexSettings, Collections.emptyMap());
return new MapperService(indexSettings,

View File

@ -268,7 +268,7 @@ public abstract class IndexShardTestCase extends ESTestCase {
try {
IndexCache indexCache = new IndexCache(indexSettings, new DisabledQueryCache(indexSettings), null);
MapperService mapperService = MapperTestUtils.newMapperService(xContentRegistry(), createTempDir(),
indexSettings.getSettings());
indexSettings.getSettings(), "index");
mapperService.merge(indexMetaData, MapperService.MergeReason.MAPPING_RECOVERY, true);
SimilarityService similarityService = new SimilarityService(indexSettings, Collections.emptyMap());
final IndexEventListener indexEventListener = new IndexEventListener() {