diff --git a/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 8cb4f8ff3f8..f376c2c07b4 100644 --- a/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -165,7 +165,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation } ops[requestIndex] = result.op; } - } catch (WriteFailure e){ + } catch (WriteFailure e) { Tuple mappingsToUpdateOnFailure = e.mappingsToUpdate; if (mappingsToUpdateOnFailure != null) { mappingsToUpdate.add(mappingsToUpdateOnFailure); @@ -410,7 +410,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation Engine.IndexingOperation op; try { if (indexRequest.opType() == IndexRequest.OpType.INDEX) { - Engine.Index index = indexShard.prepareIndex(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.PRIMARY); + Engine.Index index = indexShard.prepareIndex(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates() || indexRequest.canHaveDuplicates()); if (index.parsedDoc().mappingsModified()) { mappingsToUpdate = Tuple.tuple(indexRequest.index(), indexRequest.type()); } @@ -419,7 +419,8 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation op = index; created = index.created(); } else { - Engine.Create create = indexShard.prepareCreate(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.PRIMARY); + Engine.Create create = indexShard.prepareCreate(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.PRIMARY, + request.canHaveDuplicates() || indexRequest.canHaveDuplicates(), indexRequest.autoGeneratedId()); if (create.parsedDoc().mappingsModified()) { mappingsToUpdate = Tuple.tuple(indexRequest.index(), indexRequest.type()); } @@ -546,6 +547,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation } } + protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) { IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.request.index()).shardSafe(shardRequest.shardId); final BulkShardRequest request = shardRequest.request; @@ -561,12 +563,12 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation .routing(indexRequest.routing()).parent(indexRequest.parent()).timestamp(indexRequest.timestamp()).ttl(indexRequest.ttl()); if (indexRequest.opType() == IndexRequest.OpType.INDEX) { - Engine.Index index = indexShard.prepareIndex(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.REPLICA); + Engine.Index index = indexShard.prepareIndex(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.REPLICA, request.canHaveDuplicates() || indexRequest.canHaveDuplicates()); indexShard.index(index); } else { Engine.Create create = indexShard.prepareCreate(sourceToParse, indexRequest.version(), indexRequest.versionType(), - Engine.Operation.Origin.REPLICA); + Engine.Operation.Origin.REPLICA, request.canHaveDuplicates() || indexRequest.canHaveDuplicates(), indexRequest.autoGeneratedId()); indexShard.create(create); } } catch (Throwable e) { diff --git a/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/src/main/java/org/elasticsearch/action/index/IndexRequest.java index 49e81cf15d4..38751a98c39 100644 --- a/src/main/java/org/elasticsearch/action/index/IndexRequest.java +++ b/src/main/java/org/elasticsearch/action/index/IndexRequest.java @@ -20,10 +20,7 @@ package org.elasticsearch.action.index; import com.google.common.base.Charsets; -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.ElasticsearchGenerationException; -import org.elasticsearch.ElasticsearchIllegalArgumentException; -import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.*; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.RoutingMissingException; import org.elasticsearch.action.support.replication.ShardReplicationOperationRequest; @@ -128,6 +125,7 @@ public class IndexRequest extends ShardReplicationOperationRequest private boolean sourceUnsafe; private OpType opType = OpType.INDEX; + private boolean autoGeneratedId = false; private boolean refresh = false; private long version = Versions.MATCH_ANY; @@ -541,6 +539,13 @@ public class IndexRequest extends ShardReplicationOperationRequest return this.versionType; } + /** + * Has the id been auto generated? + */ + public boolean autoGeneratedId() { + return this.autoGeneratedId; + } + public void process(MetaData metaData, String aliasOrIndex, @Nullable MappingMetaData mappingMd, boolean allowIdGeneration) throws ElasticsearchException { // resolve the routing if needed routing(metaData.resolveIndexRouting(routing, aliasOrIndex)); @@ -597,6 +602,7 @@ public class IndexRequest extends ShardReplicationOperationRequest id(Strings.randomBase64UUID()); // since we generate the id, change it to CREATE opType(IndexRequest.OpType.CREATE); + autoGeneratedId = true; } } @@ -622,6 +628,9 @@ public class IndexRequest extends ShardReplicationOperationRequest refresh = in.readBoolean(); version = in.readLong(); versionType = VersionType.fromValue(in.readByte()); + if (in.getVersion().onOrAfter(Version.V_1_2_0)) { + autoGeneratedId = in.readBoolean(); + } } @Override @@ -638,6 +647,9 @@ public class IndexRequest extends ShardReplicationOperationRequest out.writeBoolean(refresh); out.writeLong(version); out.writeByte(versionType.getValue()); + if (out.getVersion().onOrAfter(Version.V_1_2_0)) { + out.writeBoolean(autoGeneratedId); + } } @Override diff --git a/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java b/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java index a0bf4933e0f..50163c91726 100644 --- a/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java +++ b/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java @@ -191,7 +191,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi boolean created; Engine.IndexingOperation op; if (request.opType() == IndexRequest.OpType.INDEX) { - Engine.Index index = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY); + Engine.Index index = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates()); if (index.parsedDoc().mappingsModified()) { mappingUpdatedAction.updateMappingOnMaster(request.index(), request.type(), indexMetaData.getUUID(), false); } @@ -201,7 +201,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi created = index.created(); } else { Engine.Create create = indexShard.prepareCreate(sourceToParse, - request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY); + request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates(), request.autoGeneratedId()); if (create.parsedDoc().mappingsModified()) { mappingUpdatedAction.updateMappingOnMaster(request.index(), request.type(), indexMetaData.getUUID(), false); } @@ -235,11 +235,11 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.REPLICA, request.source()).type(request.type()).id(request.id()) .routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl()); if (request.opType() == IndexRequest.OpType.INDEX) { - Engine.Index index = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.REPLICA); + Engine.Index index = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.REPLICA, request.canHaveDuplicates()); indexShard.index(index); } else { Engine.Create create = indexShard.prepareCreate(sourceToParse, - request.version(), request.versionType(), Engine.Operation.Origin.REPLICA); + request.version(), request.versionType(), Engine.Operation.Origin.REPLICA, request.canHaveDuplicates(), request.autoGeneratedId()); indexShard.create(create); } if (request.refresh()) { diff --git a/src/main/java/org/elasticsearch/action/support/replication/ShardReplicationOperationRequest.java b/src/main/java/org/elasticsearch/action/support/replication/ShardReplicationOperationRequest.java index 7bcafbbd72a..4809b839fdf 100644 --- a/src/main/java/org/elasticsearch/action/support/replication/ShardReplicationOperationRequest.java +++ b/src/main/java/org/elasticsearch/action/support/replication/ShardReplicationOperationRequest.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.support.replication; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.WriteConsistencyLevel; @@ -45,6 +46,7 @@ public abstract class ShardReplicationOperationRequest gcDeletesInMillis) { - currentVersion = Versions.NOT_FOUND; // deleted, and GC + versionValue = versionMap.get(versionKey); + if (versionValue == null) { + currentVersion = loadCurrentVersionFromIndex(create.uid()); } else { - currentVersion = versionValue.version(); + if (enableGcDeletes && versionValue.delete() && (threadPool.estimatedTimeInMillis() - versionValue.time()) > gcDeletesInMillis) { + currentVersion = Versions.NOT_FOUND; // deleted, and GC + } else { + currentVersion = versionValue.version(); + } } } diff --git a/src/main/java/org/elasticsearch/index/shard/service/IndexShard.java b/src/main/java/org/elasticsearch/index/shard/service/IndexShard.java index c880343ff9b..4a5ea5bd620 100644 --- a/src/main/java/org/elasticsearch/index/shard/service/IndexShard.java +++ b/src/main/java/org/elasticsearch/index/shard/service/IndexShard.java @@ -129,11 +129,11 @@ public interface IndexShard extends IndexShardComponent { IndexShardState state(); - Engine.Create prepareCreate(SourceToParse source, long version, VersionType versionType, Engine.Operation.Origin origin) throws ElasticsearchException; + Engine.Create prepareCreate(SourceToParse source, long version, VersionType versionType, Engine.Operation.Origin origin, boolean canHaveDuplicates, boolean autoGeneratedId) throws ElasticsearchException; ParsedDocument create(Engine.Create create) throws ElasticsearchException; - Engine.Index prepareIndex(SourceToParse source, long version, VersionType versionType, Engine.Operation.Origin origin) throws ElasticsearchException; + Engine.Index prepareIndex(SourceToParse source, long version, VersionType versionType, Engine.Operation.Origin origin, boolean canHaveDuplicates) throws ElasticsearchException; ParsedDocument index(Engine.Index index) throws ElasticsearchException; diff --git a/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java b/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java index 3839149a40e..722baef6934 100644 --- a/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java +++ b/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java @@ -367,11 +367,11 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I } @Override - public Engine.Create prepareCreate(SourceToParse source, long version, VersionType versionType, Engine.Operation.Origin origin) throws ElasticsearchException { + public Engine.Create prepareCreate(SourceToParse source, long version, VersionType versionType, Engine.Operation.Origin origin, boolean canHaveDuplicates, boolean autoGeneratedId) throws ElasticsearchException { long startTime = System.nanoTime(); DocumentMapper docMapper = mapperService.documentMapperWithAutoCreate(source.type()); ParsedDocument doc = docMapper.parse(source); - return new Engine.Create(docMapper, docMapper.uidMapper().term(doc.uid().stringValue()), doc, version, versionType, origin, startTime); + return new Engine.Create(docMapper, docMapper.uidMapper().term(doc.uid().stringValue()), doc, version, versionType, origin, startTime, state != IndexShardState.STARTED || canHaveDuplicates, autoGeneratedId); } @Override @@ -388,11 +388,11 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I } @Override - public Engine.Index prepareIndex(SourceToParse source, long version, VersionType versionType, Engine.Operation.Origin origin) throws ElasticsearchException { + public Engine.Index prepareIndex(SourceToParse source, long version, VersionType versionType, Engine.Operation.Origin origin, boolean canHaveDuplicates) throws ElasticsearchException { long startTime = System.nanoTime(); DocumentMapper docMapper = mapperService.documentMapperWithAutoCreate(source.type()); ParsedDocument doc = docMapper.parse(source); - return new Engine.Index(docMapper, docMapper.uidMapper().term(doc.uid().stringValue()), doc, version, versionType, origin, startTime); + return new Engine.Index(docMapper, docMapper.uidMapper().term(doc.uid().stringValue()), doc, version, versionType, origin, startTime, state != IndexShardState.STARTED || canHaveDuplicates); } @Override @@ -747,13 +747,13 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I engine.create(prepareCreate( source(create.source()).type(create.type()).id(create.id()) .routing(create.routing()).parent(create.parent()).timestamp(create.timestamp()).ttl(create.ttl()), - create.version(), create.versionType().versionTypeForReplicationAndRecovery(), Engine.Operation.Origin.RECOVERY)); + create.version(), create.versionType().versionTypeForReplicationAndRecovery(), Engine.Operation.Origin.RECOVERY, true, false)); break; case SAVE: Translog.Index index = (Translog.Index) operation; engine.index(prepareIndex(source(index.source()).type(index.type()).id(index.id()) .routing(index.routing()).parent(index.parent()).timestamp(index.timestamp()).ttl(index.ttl()), - index.version(),index.versionType().versionTypeForReplicationAndRecovery(), Engine.Operation.Origin.RECOVERY)); + index.version(),index.versionType().versionTypeForReplicationAndRecovery(), Engine.Operation.Origin.RECOVERY, true)); break; case DELETE: Translog.Delete delete = (Translog.Delete) operation; diff --git a/src/test/java/org/elasticsearch/benchmark/stress/SingleThreadBulkStress.java b/src/test/java/org/elasticsearch/benchmark/stress/SingleThreadBulkStress.java index e36f645ab25..a46cd529d07 100644 --- a/src/test/java/org/elasticsearch/benchmark/stress/SingleThreadBulkStress.java +++ b/src/test/java/org/elasticsearch/benchmark/stress/SingleThreadBulkStress.java @@ -50,6 +50,7 @@ public class SingleThreadBulkStress { int shardsCount = Integer.parseInt(System.getProperty("es.shards", "1")); int replicaCount = Integer.parseInt(System.getProperty("es.replica", "1")); + boolean autoGenerateId = true; Settings settings = settingsBuilder() .put("index.refresh_interval", "1s") @@ -94,7 +95,7 @@ public class SingleThreadBulkStress { BulkRequestBuilder request = client1.prepareBulk(); for (int j = 0; j < BATCH; j++) { counter++; - request.add(Requests.indexRequest("test").type("type1").id(Integer.toString(counter)).source(source(Integer.toString(counter), "test" + counter))); + request.add(Requests.indexRequest("test").type("type1").id(autoGenerateId ? null : Integer.toString(counter)).source(source(Integer.toString(counter), "test" + counter))); } BulkResponse response = request.execute().actionGet(); if (response.hasFailures()) {