diff --git a/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index bff9e24d0e9..8344ae15e6c 100644 --- a/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -424,7 +424,8 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation op = index; created = index.created(); } else { - Engine.Create create = indexShard.prepareCreate(sourceToParse).version(indexRequest.version()).versionType(indexRequest.versionType()).origin(Engine.Operation.Origin.PRIMARY); + Engine.Create create = indexShard.prepareCreate(sourceToParse).version(indexRequest.version()).versionType(indexRequest.versionType()).origin(Engine.Operation.Origin.PRIMARY) + .autoGeneratedId(indexRequest.autoGeneratedId()); if (create.parsedDoc().mappingsModified()) { mappingsToUpdate = Tuple.tuple(indexRequest.index(), indexRequest.type()); } @@ -573,6 +574,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation } else { Engine.Create create = indexShard.prepareCreate(sourceToParse) .version(indexRequest.version()).versionType(indexRequest.versionType()) + .autoGeneratedId(indexRequest.autoGeneratedId()) .origin(Engine.Operation.Origin.REPLICA); indexShard.create(create); } diff --git a/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/src/main/java/org/elasticsearch/action/index/IndexRequest.java index 49e81cf15d4..38751a98c39 100644 --- a/src/main/java/org/elasticsearch/action/index/IndexRequest.java +++ b/src/main/java/org/elasticsearch/action/index/IndexRequest.java @@ -20,10 +20,7 @@ package org.elasticsearch.action.index; import com.google.common.base.Charsets; -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.ElasticsearchGenerationException; -import org.elasticsearch.ElasticsearchIllegalArgumentException; -import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.*; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.RoutingMissingException; import org.elasticsearch.action.support.replication.ShardReplicationOperationRequest; @@ -128,6 +125,7 @@ public class IndexRequest extends ShardReplicationOperationRequest private boolean sourceUnsafe; private OpType opType = OpType.INDEX; + private boolean autoGeneratedId = false; private boolean refresh = false; private long version = Versions.MATCH_ANY; @@ -541,6 +539,13 @@ public class IndexRequest extends ShardReplicationOperationRequest return this.versionType; } + /** + * Has the id been auto generated? + */ + public boolean autoGeneratedId() { + return this.autoGeneratedId; + } + public void process(MetaData metaData, String aliasOrIndex, @Nullable MappingMetaData mappingMd, boolean allowIdGeneration) throws ElasticsearchException { // resolve the routing if needed routing(metaData.resolveIndexRouting(routing, aliasOrIndex)); @@ -597,6 +602,7 @@ public class IndexRequest extends ShardReplicationOperationRequest id(Strings.randomBase64UUID()); // since we generate the id, change it to CREATE opType(IndexRequest.OpType.CREATE); + autoGeneratedId = true; } } @@ -622,6 +628,9 @@ public class IndexRequest extends ShardReplicationOperationRequest refresh = in.readBoolean(); version = in.readLong(); versionType = VersionType.fromValue(in.readByte()); + if (in.getVersion().onOrAfter(Version.V_1_2_0)) { + autoGeneratedId = in.readBoolean(); + } } @Override @@ -638,6 +647,9 @@ public class IndexRequest extends ShardReplicationOperationRequest out.writeBoolean(refresh); out.writeLong(version); out.writeByte(versionType.getValue()); + if (out.getVersion().onOrAfter(Version.V_1_2_0)) { + out.writeBoolean(autoGeneratedId); + } } @Override diff --git a/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java b/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java index b612844fbd2..b5f18d36c1f 100644 --- a/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java +++ b/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java @@ -215,6 +215,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi Engine.Create create = indexShard.prepareCreate(sourceToParse) .version(request.version()) .versionType(request.versionType()) + .autoGeneratedId(request.autoGeneratedId()) .origin(Engine.Operation.Origin.PRIMARY); if (create.parsedDoc().mappingsModified()) { updateMappingOnMaster(request, indexMetaData); @@ -256,6 +257,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi } else { Engine.Create create = indexShard.prepareCreate(sourceToParse) .version(request.version()).versionType(request.versionType()) + .autoGeneratedId(request.autoGeneratedId()) .origin(Engine.Operation.Origin.REPLICA); indexShard.create(create); } diff --git a/src/main/java/org/elasticsearch/index/engine/Engine.java b/src/main/java/org/elasticsearch/index/engine/Engine.java index 81934bbab7d..71663ed30ad 100644 --- a/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -388,6 +388,7 @@ public interface Engine extends IndexShardComponent, CloseableComponent { private final Term uid; private final ParsedDocument doc; private long version = Versions.MATCH_ANY; + private boolean autoGeneratedId = false; private VersionType versionType = VersionType.INTERNAL; private Origin origin = Origin.PRIMARY; @@ -468,6 +469,15 @@ public interface Engine extends IndexShardComponent, CloseableComponent { return this; } + public Create autoGeneratedId(boolean autoGeneratedId) { + this.autoGeneratedId = autoGeneratedId; + return this; + } + + public boolean autoGeneratedId() { + return this.autoGeneratedId; + } + public String parent() { return this.doc.parent(); } diff --git a/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java b/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java index d9cacd94f30..a92a14965df 100644 --- a/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java +++ b/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java @@ -406,14 +406,20 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin synchronized (dirtyLock(create.uid())) { HashedBytesRef versionKey = versionKey(create.uid()); final long currentVersion; - VersionValue versionValue = versionMap.get(versionKey); - if (versionValue == null) { - currentVersion = loadCurrentVersionFromIndex(create.uid()); + final VersionValue versionValue; + if (create.autoGeneratedId()) { + currentVersion = Versions.NOT_FOUND; + versionValue = null; } else { - if (enableGcDeletes && versionValue.delete() && (threadPool.estimatedTimeInMillis() - versionValue.time()) > gcDeletesInMillis) { - currentVersion = Versions.NOT_FOUND; // deleted, and GC + versionValue = versionMap.get(versionKey); + if (versionValue == null) { + currentVersion = loadCurrentVersionFromIndex(create.uid()); } else { - currentVersion = versionValue.version(); + if (enableGcDeletes && versionValue.delete() && (threadPool.estimatedTimeInMillis() - versionValue.time()) > gcDeletesInMillis) { + currentVersion = Versions.NOT_FOUND; // deleted, and GC + } else { + currentVersion = versionValue.version(); + } } } diff --git a/src/test/java/org/elasticsearch/benchmark/stress/SingleThreadBulkStress.java b/src/test/java/org/elasticsearch/benchmark/stress/SingleThreadBulkStress.java index e36f645ab25..a46cd529d07 100644 --- a/src/test/java/org/elasticsearch/benchmark/stress/SingleThreadBulkStress.java +++ b/src/test/java/org/elasticsearch/benchmark/stress/SingleThreadBulkStress.java @@ -50,6 +50,7 @@ public class SingleThreadBulkStress { int shardsCount = Integer.parseInt(System.getProperty("es.shards", "1")); int replicaCount = Integer.parseInt(System.getProperty("es.replica", "1")); + boolean autoGenerateId = true; Settings settings = settingsBuilder() .put("index.refresh_interval", "1s") @@ -94,7 +95,7 @@ public class SingleThreadBulkStress { BulkRequestBuilder request = client1.prepareBulk(); for (int j = 0; j < BATCH; j++) { counter++; - request.add(Requests.indexRequest("test").type("type1").id(Integer.toString(counter)).source(source(Integer.toString(counter), "test" + counter))); + request.add(Requests.indexRequest("test").type("type1").id(autoGenerateId ? null : Integer.toString(counter)).source(source(Integer.toString(counter), "test" + counter))); } BulkResponse response = request.execute().actionGet(); if (response.hasFailures()) {