From 65bc017271db1bd4a18b89d6ce1b8512984206ee Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Sat, 12 Apr 2014 12:10:56 +0200 Subject: [PATCH] Don't lookup version for auto generated id and create When a create document is executed, and its an auto generated id (based on UUID), we know that the document will not exists in the index, so there is no need to try and lookup the version from the index. For many cases, like logging, where ids are auto generated, this can improve the indexing performance, specifically for lightweight documents where analysis is not a big part of the execution. closes #5917 --- .../action/bulk/TransportShardBulkAction.java | 12 ++++---- .../action/index/IndexRequest.java | 20 ++++++++++--- .../action/index/TransportIndexAction.java | 8 ++--- .../ShardReplicationOperationRequest.java | 19 ++++++++++++ ...nsportShardReplicationOperationAction.java | 12 +++++++- .../elasticsearch/index/engine/Engine.java | 29 ++++++++++++++++--- .../index/engine/internal/InternalEngine.java | 20 +++++++++---- .../index/shard/service/IndexShard.java | 4 +-- .../shard/service/InternalIndexShard.java | 12 ++++---- .../stress/SingleThreadBulkStress.java | 3 +- 10 files changed, 106 insertions(+), 33 deletions(-) diff --git a/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 8cb4f8ff3f8..f376c2c07b4 100644 --- a/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -165,7 +165,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation } ops[requestIndex] = result.op; } - } catch (WriteFailure e){ + } catch (WriteFailure e) { Tuple mappingsToUpdateOnFailure = e.mappingsToUpdate; if (mappingsToUpdateOnFailure != null) { mappingsToUpdate.add(mappingsToUpdateOnFailure); @@ -410,7 +410,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation Engine.IndexingOperation op; try { if (indexRequest.opType() == IndexRequest.OpType.INDEX) { - Engine.Index index = indexShard.prepareIndex(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.PRIMARY); + Engine.Index index = indexShard.prepareIndex(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates() || indexRequest.canHaveDuplicates()); if (index.parsedDoc().mappingsModified()) { mappingsToUpdate = Tuple.tuple(indexRequest.index(), indexRequest.type()); } @@ -419,7 +419,8 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation op = index; created = index.created(); } else { - Engine.Create create = indexShard.prepareCreate(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.PRIMARY); + Engine.Create create = indexShard.prepareCreate(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.PRIMARY, + request.canHaveDuplicates() || indexRequest.canHaveDuplicates(), indexRequest.autoGeneratedId()); if (create.parsedDoc().mappingsModified()) { mappingsToUpdate = Tuple.tuple(indexRequest.index(), indexRequest.type()); } @@ -546,6 +547,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation } } + protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) { IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.request.index()).shardSafe(shardRequest.shardId); final BulkShardRequest request = shardRequest.request; @@ -561,12 +563,12 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation .routing(indexRequest.routing()).parent(indexRequest.parent()).timestamp(indexRequest.timestamp()).ttl(indexRequest.ttl()); if (indexRequest.opType() == IndexRequest.OpType.INDEX) { - Engine.Index index = indexShard.prepareIndex(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.REPLICA); + Engine.Index index = indexShard.prepareIndex(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.REPLICA, request.canHaveDuplicates() || indexRequest.canHaveDuplicates()); indexShard.index(index); } else { Engine.Create create = indexShard.prepareCreate(sourceToParse, indexRequest.version(), indexRequest.versionType(), - Engine.Operation.Origin.REPLICA); + Engine.Operation.Origin.REPLICA, request.canHaveDuplicates() || indexRequest.canHaveDuplicates(), indexRequest.autoGeneratedId()); indexShard.create(create); } } catch (Throwable e) { diff --git a/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/src/main/java/org/elasticsearch/action/index/IndexRequest.java index 49e81cf15d4..38751a98c39 100644 --- a/src/main/java/org/elasticsearch/action/index/IndexRequest.java +++ b/src/main/java/org/elasticsearch/action/index/IndexRequest.java @@ -20,10 +20,7 @@ package org.elasticsearch.action.index; import com.google.common.base.Charsets; -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.ElasticsearchGenerationException; -import org.elasticsearch.ElasticsearchIllegalArgumentException; -import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.*; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.RoutingMissingException; import org.elasticsearch.action.support.replication.ShardReplicationOperationRequest; @@ -128,6 +125,7 @@ public class IndexRequest extends ShardReplicationOperationRequest private boolean sourceUnsafe; private OpType opType = OpType.INDEX; + private boolean autoGeneratedId = false; private boolean refresh = false; private long version = Versions.MATCH_ANY; @@ -541,6 +539,13 @@ public class IndexRequest extends ShardReplicationOperationRequest return this.versionType; } + /** + * Has the id been auto generated? + */ + public boolean autoGeneratedId() { + return this.autoGeneratedId; + } + public void process(MetaData metaData, String aliasOrIndex, @Nullable MappingMetaData mappingMd, boolean allowIdGeneration) throws ElasticsearchException { // resolve the routing if needed routing(metaData.resolveIndexRouting(routing, aliasOrIndex)); @@ -597,6 +602,7 @@ public class IndexRequest extends ShardReplicationOperationRequest id(Strings.randomBase64UUID()); // since we generate the id, change it to CREATE opType(IndexRequest.OpType.CREATE); + autoGeneratedId = true; } } @@ -622,6 +628,9 @@ public class IndexRequest extends ShardReplicationOperationRequest refresh = in.readBoolean(); version = in.readLong(); versionType = VersionType.fromValue(in.readByte()); + if (in.getVersion().onOrAfter(Version.V_1_2_0)) { + autoGeneratedId = in.readBoolean(); + } } @Override @@ -638,6 +647,9 @@ public class IndexRequest extends ShardReplicationOperationRequest out.writeBoolean(refresh); out.writeLong(version); out.writeByte(versionType.getValue()); + if (out.getVersion().onOrAfter(Version.V_1_2_0)) { + out.writeBoolean(autoGeneratedId); + } } @Override diff --git a/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java b/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java index a0bf4933e0f..50163c91726 100644 --- a/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java +++ b/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java @@ -191,7 +191,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi boolean created; Engine.IndexingOperation op; if (request.opType() == IndexRequest.OpType.INDEX) { - Engine.Index index = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY); + Engine.Index index = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates()); if (index.parsedDoc().mappingsModified()) { mappingUpdatedAction.updateMappingOnMaster(request.index(), request.type(), indexMetaData.getUUID(), false); } @@ -201,7 +201,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi created = index.created(); } else { Engine.Create create = indexShard.prepareCreate(sourceToParse, - request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY); + request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates(), request.autoGeneratedId()); if (create.parsedDoc().mappingsModified()) { mappingUpdatedAction.updateMappingOnMaster(request.index(), request.type(), indexMetaData.getUUID(), false); } @@ -235,11 +235,11 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.REPLICA, request.source()).type(request.type()).id(request.id()) .routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl()); if (request.opType() == IndexRequest.OpType.INDEX) { - Engine.Index index = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.REPLICA); + Engine.Index index = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.REPLICA, request.canHaveDuplicates()); indexShard.index(index); } else { Engine.Create create = indexShard.prepareCreate(sourceToParse, - request.version(), request.versionType(), Engine.Operation.Origin.REPLICA); + request.version(), request.versionType(), Engine.Operation.Origin.REPLICA, request.canHaveDuplicates(), request.autoGeneratedId()); indexShard.create(create); } if (request.refresh()) { diff --git a/src/main/java/org/elasticsearch/action/support/replication/ShardReplicationOperationRequest.java b/src/main/java/org/elasticsearch/action/support/replication/ShardReplicationOperationRequest.java index 7bcafbbd72a..4809b839fdf 100644 --- a/src/main/java/org/elasticsearch/action/support/replication/ShardReplicationOperationRequest.java +++ b/src/main/java/org/elasticsearch/action/support/replication/ShardReplicationOperationRequest.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.support.replication; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.WriteConsistencyLevel; @@ -45,6 +46,7 @@ public abstract class ShardReplicationOperationRequest gcDeletesInMillis) { - currentVersion = Versions.NOT_FOUND; // deleted, and GC + versionValue = versionMap.get(versionKey); + if (versionValue == null) { + currentVersion = loadCurrentVersionFromIndex(create.uid()); } else { - currentVersion = versionValue.version(); + if (enableGcDeletes && versionValue.delete() && (threadPool.estimatedTimeInMillis() - versionValue.time()) > gcDeletesInMillis) { + currentVersion = Versions.NOT_FOUND; // deleted, and GC + } else { + currentVersion = versionValue.version(); + } } } diff --git a/src/main/java/org/elasticsearch/index/shard/service/IndexShard.java b/src/main/java/org/elasticsearch/index/shard/service/IndexShard.java index c880343ff9b..4a5ea5bd620 100644 --- a/src/main/java/org/elasticsearch/index/shard/service/IndexShard.java +++ b/src/main/java/org/elasticsearch/index/shard/service/IndexShard.java @@ -129,11 +129,11 @@ public interface IndexShard extends IndexShardComponent { IndexShardState state(); - Engine.Create prepareCreate(SourceToParse source, long version, VersionType versionType, Engine.Operation.Origin origin) throws ElasticsearchException; + Engine.Create prepareCreate(SourceToParse source, long version, VersionType versionType, Engine.Operation.Origin origin, boolean canHaveDuplicates, boolean autoGeneratedId) throws ElasticsearchException; ParsedDocument create(Engine.Create create) throws ElasticsearchException; - Engine.Index prepareIndex(SourceToParse source, long version, VersionType versionType, Engine.Operation.Origin origin) throws ElasticsearchException; + Engine.Index prepareIndex(SourceToParse source, long version, VersionType versionType, Engine.Operation.Origin origin, boolean canHaveDuplicates) throws ElasticsearchException; ParsedDocument index(Engine.Index index) throws ElasticsearchException; diff --git a/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java b/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java index 3839149a40e..722baef6934 100644 --- a/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java +++ b/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java @@ -367,11 +367,11 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I } @Override - public Engine.Create prepareCreate(SourceToParse source, long version, VersionType versionType, Engine.Operation.Origin origin) throws ElasticsearchException { + public Engine.Create prepareCreate(SourceToParse source, long version, VersionType versionType, Engine.Operation.Origin origin, boolean canHaveDuplicates, boolean autoGeneratedId) throws ElasticsearchException { long startTime = System.nanoTime(); DocumentMapper docMapper = mapperService.documentMapperWithAutoCreate(source.type()); ParsedDocument doc = docMapper.parse(source); - return new Engine.Create(docMapper, docMapper.uidMapper().term(doc.uid().stringValue()), doc, version, versionType, origin, startTime); + return new Engine.Create(docMapper, docMapper.uidMapper().term(doc.uid().stringValue()), doc, version, versionType, origin, startTime, state != IndexShardState.STARTED || canHaveDuplicates, autoGeneratedId); } @Override @@ -388,11 +388,11 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I } @Override - public Engine.Index prepareIndex(SourceToParse source, long version, VersionType versionType, Engine.Operation.Origin origin) throws ElasticsearchException { + public Engine.Index prepareIndex(SourceToParse source, long version, VersionType versionType, Engine.Operation.Origin origin, boolean canHaveDuplicates) throws ElasticsearchException { long startTime = System.nanoTime(); DocumentMapper docMapper = mapperService.documentMapperWithAutoCreate(source.type()); ParsedDocument doc = docMapper.parse(source); - return new Engine.Index(docMapper, docMapper.uidMapper().term(doc.uid().stringValue()), doc, version, versionType, origin, startTime); + return new Engine.Index(docMapper, docMapper.uidMapper().term(doc.uid().stringValue()), doc, version, versionType, origin, startTime, state != IndexShardState.STARTED || canHaveDuplicates); } @Override @@ -747,13 +747,13 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I engine.create(prepareCreate( source(create.source()).type(create.type()).id(create.id()) .routing(create.routing()).parent(create.parent()).timestamp(create.timestamp()).ttl(create.ttl()), - create.version(), create.versionType().versionTypeForReplicationAndRecovery(), Engine.Operation.Origin.RECOVERY)); + create.version(), create.versionType().versionTypeForReplicationAndRecovery(), Engine.Operation.Origin.RECOVERY, true, false)); break; case SAVE: Translog.Index index = (Translog.Index) operation; engine.index(prepareIndex(source(index.source()).type(index.type()).id(index.id()) .routing(index.routing()).parent(index.parent()).timestamp(index.timestamp()).ttl(index.ttl()), - index.version(),index.versionType().versionTypeForReplicationAndRecovery(), Engine.Operation.Origin.RECOVERY)); + index.version(),index.versionType().versionTypeForReplicationAndRecovery(), Engine.Operation.Origin.RECOVERY, true)); break; case DELETE: Translog.Delete delete = (Translog.Delete) operation; diff --git a/src/test/java/org/elasticsearch/benchmark/stress/SingleThreadBulkStress.java b/src/test/java/org/elasticsearch/benchmark/stress/SingleThreadBulkStress.java index e36f645ab25..a46cd529d07 100644 --- a/src/test/java/org/elasticsearch/benchmark/stress/SingleThreadBulkStress.java +++ b/src/test/java/org/elasticsearch/benchmark/stress/SingleThreadBulkStress.java @@ -50,6 +50,7 @@ public class SingleThreadBulkStress { int shardsCount = Integer.parseInt(System.getProperty("es.shards", "1")); int replicaCount = Integer.parseInt(System.getProperty("es.replica", "1")); + boolean autoGenerateId = true; Settings settings = settingsBuilder() .put("index.refresh_interval", "1s") @@ -94,7 +95,7 @@ public class SingleThreadBulkStress { BulkRequestBuilder request = client1.prepareBulk(); for (int j = 0; j < BATCH; j++) { counter++; - request.add(Requests.indexRequest("test").type("type1").id(Integer.toString(counter)).source(source(Integer.toString(counter), "test" + counter))); + request.add(Requests.indexRequest("test").type("type1").id(autoGenerateId ? null : Integer.toString(counter)).source(source(Integer.toString(counter), "test" + counter))); } BulkResponse response = request.execute().actionGet(); if (response.hasFailures()) {