Don't lookup version for auto generated id and create
When a create document is executed, and its an auto generated id (based on UUID), we know that the document will not exists in the index, so there is no need to try and lookup the version from the index. For many cases, like logging, where ids are auto generated, this can improve the indexing performance, specifically for lightweight documents where analysis is not a big part of the execution. closes #5917
This commit is contained in:
parent
0b3605f4f2
commit
65bc017271
|
@ -165,7 +165,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
|
||||||
}
|
}
|
||||||
ops[requestIndex] = result.op;
|
ops[requestIndex] = result.op;
|
||||||
}
|
}
|
||||||
} catch (WriteFailure e){
|
} catch (WriteFailure e) {
|
||||||
Tuple<String, String> mappingsToUpdateOnFailure = e.mappingsToUpdate;
|
Tuple<String, String> mappingsToUpdateOnFailure = e.mappingsToUpdate;
|
||||||
if (mappingsToUpdateOnFailure != null) {
|
if (mappingsToUpdateOnFailure != null) {
|
||||||
mappingsToUpdate.add(mappingsToUpdateOnFailure);
|
mappingsToUpdate.add(mappingsToUpdateOnFailure);
|
||||||
|
@ -410,7 +410,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
|
||||||
Engine.IndexingOperation op;
|
Engine.IndexingOperation op;
|
||||||
try {
|
try {
|
||||||
if (indexRequest.opType() == IndexRequest.OpType.INDEX) {
|
if (indexRequest.opType() == IndexRequest.OpType.INDEX) {
|
||||||
Engine.Index index = indexShard.prepareIndex(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.PRIMARY);
|
Engine.Index index = indexShard.prepareIndex(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates() || indexRequest.canHaveDuplicates());
|
||||||
if (index.parsedDoc().mappingsModified()) {
|
if (index.parsedDoc().mappingsModified()) {
|
||||||
mappingsToUpdate = Tuple.tuple(indexRequest.index(), indexRequest.type());
|
mappingsToUpdate = Tuple.tuple(indexRequest.index(), indexRequest.type());
|
||||||
}
|
}
|
||||||
|
@ -419,7 +419,8 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
|
||||||
op = index;
|
op = index;
|
||||||
created = index.created();
|
created = index.created();
|
||||||
} else {
|
} else {
|
||||||
Engine.Create create = indexShard.prepareCreate(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.PRIMARY);
|
Engine.Create create = indexShard.prepareCreate(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.PRIMARY,
|
||||||
|
request.canHaveDuplicates() || indexRequest.canHaveDuplicates(), indexRequest.autoGeneratedId());
|
||||||
if (create.parsedDoc().mappingsModified()) {
|
if (create.parsedDoc().mappingsModified()) {
|
||||||
mappingsToUpdate = Tuple.tuple(indexRequest.index(), indexRequest.type());
|
mappingsToUpdate = Tuple.tuple(indexRequest.index(), indexRequest.type());
|
||||||
}
|
}
|
||||||
|
@ -546,6 +547,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) {
|
protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) {
|
||||||
IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.request.index()).shardSafe(shardRequest.shardId);
|
IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.request.index()).shardSafe(shardRequest.shardId);
|
||||||
final BulkShardRequest request = shardRequest.request;
|
final BulkShardRequest request = shardRequest.request;
|
||||||
|
@ -561,12 +563,12 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
|
||||||
.routing(indexRequest.routing()).parent(indexRequest.parent()).timestamp(indexRequest.timestamp()).ttl(indexRequest.ttl());
|
.routing(indexRequest.routing()).parent(indexRequest.parent()).timestamp(indexRequest.timestamp()).ttl(indexRequest.ttl());
|
||||||
|
|
||||||
if (indexRequest.opType() == IndexRequest.OpType.INDEX) {
|
if (indexRequest.opType() == IndexRequest.OpType.INDEX) {
|
||||||
Engine.Index index = indexShard.prepareIndex(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.REPLICA);
|
Engine.Index index = indexShard.prepareIndex(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.REPLICA, request.canHaveDuplicates() || indexRequest.canHaveDuplicates());
|
||||||
indexShard.index(index);
|
indexShard.index(index);
|
||||||
} else {
|
} else {
|
||||||
Engine.Create create = indexShard.prepareCreate(sourceToParse,
|
Engine.Create create = indexShard.prepareCreate(sourceToParse,
|
||||||
indexRequest.version(), indexRequest.versionType(),
|
indexRequest.version(), indexRequest.versionType(),
|
||||||
Engine.Operation.Origin.REPLICA);
|
Engine.Operation.Origin.REPLICA, request.canHaveDuplicates() || indexRequest.canHaveDuplicates(), indexRequest.autoGeneratedId());
|
||||||
indexShard.create(create);
|
indexShard.create(create);
|
||||||
}
|
}
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
|
|
|
@ -20,10 +20,7 @@
|
||||||
package org.elasticsearch.action.index;
|
package org.elasticsearch.action.index;
|
||||||
|
|
||||||
import com.google.common.base.Charsets;
|
import com.google.common.base.Charsets;
|
||||||
import org.elasticsearch.ElasticsearchException;
|
import org.elasticsearch.*;
|
||||||
import org.elasticsearch.ElasticsearchGenerationException;
|
|
||||||
import org.elasticsearch.ElasticsearchIllegalArgumentException;
|
|
||||||
import org.elasticsearch.ElasticsearchParseException;
|
|
||||||
import org.elasticsearch.action.ActionRequestValidationException;
|
import org.elasticsearch.action.ActionRequestValidationException;
|
||||||
import org.elasticsearch.action.RoutingMissingException;
|
import org.elasticsearch.action.RoutingMissingException;
|
||||||
import org.elasticsearch.action.support.replication.ShardReplicationOperationRequest;
|
import org.elasticsearch.action.support.replication.ShardReplicationOperationRequest;
|
||||||
|
@ -128,6 +125,7 @@ public class IndexRequest extends ShardReplicationOperationRequest<IndexRequest>
|
||||||
private boolean sourceUnsafe;
|
private boolean sourceUnsafe;
|
||||||
|
|
||||||
private OpType opType = OpType.INDEX;
|
private OpType opType = OpType.INDEX;
|
||||||
|
private boolean autoGeneratedId = false;
|
||||||
|
|
||||||
private boolean refresh = false;
|
private boolean refresh = false;
|
||||||
private long version = Versions.MATCH_ANY;
|
private long version = Versions.MATCH_ANY;
|
||||||
|
@ -541,6 +539,13 @@ public class IndexRequest extends ShardReplicationOperationRequest<IndexRequest>
|
||||||
return this.versionType;
|
return this.versionType;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Has the id been auto generated?
|
||||||
|
*/
|
||||||
|
public boolean autoGeneratedId() {
|
||||||
|
return this.autoGeneratedId;
|
||||||
|
}
|
||||||
|
|
||||||
public void process(MetaData metaData, String aliasOrIndex, @Nullable MappingMetaData mappingMd, boolean allowIdGeneration) throws ElasticsearchException {
|
public void process(MetaData metaData, String aliasOrIndex, @Nullable MappingMetaData mappingMd, boolean allowIdGeneration) throws ElasticsearchException {
|
||||||
// resolve the routing if needed
|
// resolve the routing if needed
|
||||||
routing(metaData.resolveIndexRouting(routing, aliasOrIndex));
|
routing(metaData.resolveIndexRouting(routing, aliasOrIndex));
|
||||||
|
@ -597,6 +602,7 @@ public class IndexRequest extends ShardReplicationOperationRequest<IndexRequest>
|
||||||
id(Strings.randomBase64UUID());
|
id(Strings.randomBase64UUID());
|
||||||
// since we generate the id, change it to CREATE
|
// since we generate the id, change it to CREATE
|
||||||
opType(IndexRequest.OpType.CREATE);
|
opType(IndexRequest.OpType.CREATE);
|
||||||
|
autoGeneratedId = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -622,6 +628,9 @@ public class IndexRequest extends ShardReplicationOperationRequest<IndexRequest>
|
||||||
refresh = in.readBoolean();
|
refresh = in.readBoolean();
|
||||||
version = in.readLong();
|
version = in.readLong();
|
||||||
versionType = VersionType.fromValue(in.readByte());
|
versionType = VersionType.fromValue(in.readByte());
|
||||||
|
if (in.getVersion().onOrAfter(Version.V_1_2_0)) {
|
||||||
|
autoGeneratedId = in.readBoolean();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -638,6 +647,9 @@ public class IndexRequest extends ShardReplicationOperationRequest<IndexRequest>
|
||||||
out.writeBoolean(refresh);
|
out.writeBoolean(refresh);
|
||||||
out.writeLong(version);
|
out.writeLong(version);
|
||||||
out.writeByte(versionType.getValue());
|
out.writeByte(versionType.getValue());
|
||||||
|
if (out.getVersion().onOrAfter(Version.V_1_2_0)) {
|
||||||
|
out.writeBoolean(autoGeneratedId);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -191,7 +191,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
|
||||||
boolean created;
|
boolean created;
|
||||||
Engine.IndexingOperation op;
|
Engine.IndexingOperation op;
|
||||||
if (request.opType() == IndexRequest.OpType.INDEX) {
|
if (request.opType() == IndexRequest.OpType.INDEX) {
|
||||||
Engine.Index index = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY);
|
Engine.Index index = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates());
|
||||||
if (index.parsedDoc().mappingsModified()) {
|
if (index.parsedDoc().mappingsModified()) {
|
||||||
mappingUpdatedAction.updateMappingOnMaster(request.index(), request.type(), indexMetaData.getUUID(), false);
|
mappingUpdatedAction.updateMappingOnMaster(request.index(), request.type(), indexMetaData.getUUID(), false);
|
||||||
}
|
}
|
||||||
|
@ -201,7 +201,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
|
||||||
created = index.created();
|
created = index.created();
|
||||||
} else {
|
} else {
|
||||||
Engine.Create create = indexShard.prepareCreate(sourceToParse,
|
Engine.Create create = indexShard.prepareCreate(sourceToParse,
|
||||||
request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY);
|
request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates(), request.autoGeneratedId());
|
||||||
if (create.parsedDoc().mappingsModified()) {
|
if (create.parsedDoc().mappingsModified()) {
|
||||||
mappingUpdatedAction.updateMappingOnMaster(request.index(), request.type(), indexMetaData.getUUID(), false);
|
mappingUpdatedAction.updateMappingOnMaster(request.index(), request.type(), indexMetaData.getUUID(), false);
|
||||||
}
|
}
|
||||||
|
@ -235,11 +235,11 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
|
||||||
SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.REPLICA, request.source()).type(request.type()).id(request.id())
|
SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.REPLICA, request.source()).type(request.type()).id(request.id())
|
||||||
.routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());
|
.routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());
|
||||||
if (request.opType() == IndexRequest.OpType.INDEX) {
|
if (request.opType() == IndexRequest.OpType.INDEX) {
|
||||||
Engine.Index index = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.REPLICA);
|
Engine.Index index = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.REPLICA, request.canHaveDuplicates());
|
||||||
indexShard.index(index);
|
indexShard.index(index);
|
||||||
} else {
|
} else {
|
||||||
Engine.Create create = indexShard.prepareCreate(sourceToParse,
|
Engine.Create create = indexShard.prepareCreate(sourceToParse,
|
||||||
request.version(), request.versionType(), Engine.Operation.Origin.REPLICA);
|
request.version(), request.versionType(), Engine.Operation.Origin.REPLICA, request.canHaveDuplicates(), request.autoGeneratedId());
|
||||||
indexShard.create(create);
|
indexShard.create(create);
|
||||||
}
|
}
|
||||||
if (request.refresh()) {
|
if (request.refresh()) {
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
|
|
||||||
package org.elasticsearch.action.support.replication;
|
package org.elasticsearch.action.support.replication;
|
||||||
|
|
||||||
|
import org.elasticsearch.Version;
|
||||||
import org.elasticsearch.action.ActionRequest;
|
import org.elasticsearch.action.ActionRequest;
|
||||||
import org.elasticsearch.action.ActionRequestValidationException;
|
import org.elasticsearch.action.ActionRequestValidationException;
|
||||||
import org.elasticsearch.action.WriteConsistencyLevel;
|
import org.elasticsearch.action.WriteConsistencyLevel;
|
||||||
|
@ -45,6 +46,7 @@ public abstract class ShardReplicationOperationRequest<T extends ShardReplicatio
|
||||||
private boolean threadedOperation = true;
|
private boolean threadedOperation = true;
|
||||||
private ReplicationType replicationType = ReplicationType.DEFAULT;
|
private ReplicationType replicationType = ReplicationType.DEFAULT;
|
||||||
private WriteConsistencyLevel consistencyLevel = WriteConsistencyLevel.DEFAULT;
|
private WriteConsistencyLevel consistencyLevel = WriteConsistencyLevel.DEFAULT;
|
||||||
|
private boolean canHaveDuplicates = false;
|
||||||
|
|
||||||
protected ShardReplicationOperationRequest() {
|
protected ShardReplicationOperationRequest() {
|
||||||
|
|
||||||
|
@ -63,6 +65,17 @@ public abstract class ShardReplicationOperationRequest<T extends ShardReplicatio
|
||||||
this.consistencyLevel = request.consistencyLevel();
|
this.consistencyLevel = request.consistencyLevel();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void setCanHaveDuplicates() {
|
||||||
|
this.canHaveDuplicates = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Is this request can potentially be dup on a single shard.
|
||||||
|
*/
|
||||||
|
public boolean canHaveDuplicates() {
|
||||||
|
return canHaveDuplicates;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Controls if the operation will be executed on a separate thread when executed locally.
|
* Controls if the operation will be executed on a separate thread when executed locally.
|
||||||
*/
|
*/
|
||||||
|
@ -162,6 +175,9 @@ public abstract class ShardReplicationOperationRequest<T extends ShardReplicatio
|
||||||
consistencyLevel = WriteConsistencyLevel.fromId(in.readByte());
|
consistencyLevel = WriteConsistencyLevel.fromId(in.readByte());
|
||||||
timeout = TimeValue.readTimeValue(in);
|
timeout = TimeValue.readTimeValue(in);
|
||||||
index = in.readSharedString();
|
index = in.readSharedString();
|
||||||
|
if (in.getVersion().onOrAfter(Version.V_1_2_0)) {
|
||||||
|
canHaveDuplicates = in.readBoolean();
|
||||||
|
}
|
||||||
// no need to serialize threaded* parameters, since they only matter locally
|
// no need to serialize threaded* parameters, since they only matter locally
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -172,6 +188,9 @@ public abstract class ShardReplicationOperationRequest<T extends ShardReplicatio
|
||||||
out.writeByte(consistencyLevel.id());
|
out.writeByte(consistencyLevel.id());
|
||||||
timeout.writeTo(out);
|
timeout.writeTo(out);
|
||||||
out.writeSharedString(index);
|
out.writeSharedString(index);
|
||||||
|
if (out.getVersion().onOrAfter(Version.V_1_2_0)) {
|
||||||
|
out.writeBoolean(canHaveDuplicates);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -37,6 +37,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||||
import org.elasticsearch.cluster.routing.ShardIterator;
|
import org.elasticsearch.cluster.routing.ShardIterator;
|
||||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||||
|
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
||||||
import org.elasticsearch.common.Nullable;
|
import org.elasticsearch.common.Nullable;
|
||||||
import org.elasticsearch.common.io.stream.StreamInput;
|
import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||||
|
@ -559,7 +560,6 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
|
||||||
listener.onResponse(response.response());
|
listener.onResponse(response.response());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
ShardRouting shard;
|
ShardRouting shard;
|
||||||
|
|
||||||
// we double check on the state, if it got changed we need to make sure we take the latest one cause
|
// we double check on the state, if it got changed we need to make sure we take the latest one cause
|
||||||
|
@ -595,6 +595,16 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
shardIt.reset();
|
||||||
|
request.setCanHaveDuplicates(); // safe side, cluster state changed, we might have dups
|
||||||
|
} else{
|
||||||
|
shardIt.reset();
|
||||||
|
while ((shard = shardIt.nextOrNull()) != null) {
|
||||||
|
if (shard.state() != ShardRoutingState.STARTED) {
|
||||||
|
request.setCanHaveDuplicates();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
shardIt.reset();
|
||||||
}
|
}
|
||||||
|
|
||||||
// initialize the counter
|
// initialize the counter
|
||||||
|
|
|
@ -384,11 +384,12 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
|
||||||
private long version;
|
private long version;
|
||||||
private final VersionType versionType;
|
private final VersionType versionType;
|
||||||
private final Origin origin;
|
private final Origin origin;
|
||||||
|
private final boolean canHaveDuplicates;
|
||||||
|
|
||||||
private final long startTime;
|
private final long startTime;
|
||||||
private long endTime;
|
private long endTime;
|
||||||
|
|
||||||
public IndexingOperation(DocumentMapper docMapper, Term uid, ParsedDocument doc, long version, VersionType versionType, Origin origin, long startTime) {
|
public IndexingOperation(DocumentMapper docMapper, Term uid, ParsedDocument doc, long version, VersionType versionType, Origin origin, long startTime, boolean canHaveDuplicates) {
|
||||||
this.docMapper = docMapper;
|
this.docMapper = docMapper;
|
||||||
this.uid = uid;
|
this.uid = uid;
|
||||||
this.doc = doc;
|
this.doc = doc;
|
||||||
|
@ -396,10 +397,11 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
|
||||||
this.versionType = versionType;
|
this.versionType = versionType;
|
||||||
this.origin = origin;
|
this.origin = origin;
|
||||||
this.startTime = startTime;
|
this.startTime = startTime;
|
||||||
|
this.canHaveDuplicates = canHaveDuplicates;
|
||||||
}
|
}
|
||||||
|
|
||||||
public IndexingOperation(DocumentMapper docMapper, Term uid, ParsedDocument doc) {
|
public IndexingOperation(DocumentMapper docMapper, Term uid, ParsedDocument doc) {
|
||||||
this(docMapper, uid, doc, Versions.MATCH_ANY, VersionType.INTERNAL, Origin.PRIMARY, System.nanoTime());
|
this(docMapper, uid, doc, Versions.MATCH_ANY, VersionType.INTERNAL, Origin.PRIMARY, System.nanoTime(), true);
|
||||||
}
|
}
|
||||||
|
|
||||||
public DocumentMapper docMapper() {
|
public DocumentMapper docMapper() {
|
||||||
|
@ -452,6 +454,10 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
|
||||||
return this.versionType;
|
return this.versionType;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean canHaveDuplicates() {
|
||||||
|
return this.canHaveDuplicates;
|
||||||
|
}
|
||||||
|
|
||||||
public String parent() {
|
public String parent() {
|
||||||
return this.doc.parent();
|
return this.doc.parent();
|
||||||
}
|
}
|
||||||
|
@ -488,26 +494,41 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
|
||||||
}
|
}
|
||||||
|
|
||||||
static final class Create extends IndexingOperation {
|
static final class Create extends IndexingOperation {
|
||||||
|
private final boolean autoGeneratedId;
|
||||||
|
public Create(DocumentMapper docMapper, Term uid, ParsedDocument doc, long version, VersionType versionType, Origin origin, long startTime, boolean canHaveDuplicates, boolean autoGeneratedId) {
|
||||||
|
super(docMapper, uid, doc, version, versionType, origin, startTime, canHaveDuplicates);
|
||||||
|
this.autoGeneratedId = autoGeneratedId;
|
||||||
|
}
|
||||||
|
|
||||||
public Create(DocumentMapper docMapper, Term uid, ParsedDocument doc, long version, VersionType versionType, Origin origin, long startTime) {
|
public Create(DocumentMapper docMapper, Term uid, ParsedDocument doc, long version, VersionType versionType, Origin origin, long startTime) {
|
||||||
super(docMapper, uid, doc, version, versionType, origin, startTime);
|
this(docMapper, uid, doc, version, versionType, origin, startTime, true, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Create(DocumentMapper docMapper, Term uid, ParsedDocument doc) {
|
public Create(DocumentMapper docMapper, Term uid, ParsedDocument doc) {
|
||||||
super(docMapper, uid, doc);
|
super(docMapper, uid, doc);
|
||||||
|
autoGeneratedId = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Type opType() {
|
public Type opType() {
|
||||||
return Type.CREATE;
|
return Type.CREATE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public boolean autoGeneratedId() {
|
||||||
|
return this.autoGeneratedId;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static final class Index extends IndexingOperation {
|
static final class Index extends IndexingOperation {
|
||||||
private boolean created;
|
private boolean created;
|
||||||
|
|
||||||
|
public Index(DocumentMapper docMapper, Term uid, ParsedDocument doc, long version, VersionType versionType, Origin origin, long startTime, boolean canHaveDuplicates) {
|
||||||
|
super(docMapper, uid, doc, version, versionType, origin, startTime, canHaveDuplicates);
|
||||||
|
}
|
||||||
|
|
||||||
public Index(DocumentMapper docMapper, Term uid, ParsedDocument doc, long version, VersionType versionType, Origin origin, long startTime) {
|
public Index(DocumentMapper docMapper, Term uid, ParsedDocument doc, long version, VersionType versionType, Origin origin, long startTime) {
|
||||||
super(docMapper, uid, doc, version, versionType, origin, startTime);
|
super(docMapper, uid, doc, version, versionType, origin, startTime, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Index(DocumentMapper docMapper, Term uid, ParsedDocument doc) {
|
public Index(DocumentMapper docMapper, Term uid, ParsedDocument doc) {
|
||||||
|
|
|
@ -101,6 +101,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
||||||
private long gcDeletesInMillis;
|
private long gcDeletesInMillis;
|
||||||
private volatile boolean enableGcDeletes = true;
|
private volatile boolean enableGcDeletes = true;
|
||||||
private volatile String codecName;
|
private volatile String codecName;
|
||||||
|
private final boolean optimizeAutoGenerateId;
|
||||||
|
|
||||||
private final ThreadPool threadPool;
|
private final ThreadPool threadPool;
|
||||||
|
|
||||||
|
@ -197,6 +198,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
||||||
for (int i = 0; i < dirtyLocks.length; i++) {
|
for (int i = 0; i < dirtyLocks.length; i++) {
|
||||||
dirtyLocks[i] = new Object();
|
dirtyLocks[i] = new Object();
|
||||||
}
|
}
|
||||||
|
this.optimizeAutoGenerateId = indexSettings.getAsBoolean("index.optimize_auto_generated_id", true);
|
||||||
|
|
||||||
this.indexSettingsService.addListener(applySettings);
|
this.indexSettingsService.addListener(applySettings);
|
||||||
|
|
||||||
|
@ -394,14 +396,20 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
||||||
synchronized (dirtyLock(create.uid())) {
|
synchronized (dirtyLock(create.uid())) {
|
||||||
HashedBytesRef versionKey = versionKey(create.uid());
|
HashedBytesRef versionKey = versionKey(create.uid());
|
||||||
final long currentVersion;
|
final long currentVersion;
|
||||||
VersionValue versionValue = versionMap.get(versionKey);
|
final VersionValue versionValue;
|
||||||
if (versionValue == null) {
|
if (optimizeAutoGenerateId && create.autoGeneratedId() && !create.canHaveDuplicates()) {
|
||||||
currentVersion = loadCurrentVersionFromIndex(create.uid());
|
currentVersion = Versions.NOT_FOUND;
|
||||||
|
versionValue = null;
|
||||||
} else {
|
} else {
|
||||||
if (enableGcDeletes && versionValue.delete() && (threadPool.estimatedTimeInMillis() - versionValue.time()) > gcDeletesInMillis) {
|
versionValue = versionMap.get(versionKey);
|
||||||
currentVersion = Versions.NOT_FOUND; // deleted, and GC
|
if (versionValue == null) {
|
||||||
|
currentVersion = loadCurrentVersionFromIndex(create.uid());
|
||||||
} else {
|
} else {
|
||||||
currentVersion = versionValue.version();
|
if (enableGcDeletes && versionValue.delete() && (threadPool.estimatedTimeInMillis() - versionValue.time()) > gcDeletesInMillis) {
|
||||||
|
currentVersion = Versions.NOT_FOUND; // deleted, and GC
|
||||||
|
} else {
|
||||||
|
currentVersion = versionValue.version();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -129,11 +129,11 @@ public interface IndexShard extends IndexShardComponent {
|
||||||
|
|
||||||
IndexShardState state();
|
IndexShardState state();
|
||||||
|
|
||||||
Engine.Create prepareCreate(SourceToParse source, long version, VersionType versionType, Engine.Operation.Origin origin) throws ElasticsearchException;
|
Engine.Create prepareCreate(SourceToParse source, long version, VersionType versionType, Engine.Operation.Origin origin, boolean canHaveDuplicates, boolean autoGeneratedId) throws ElasticsearchException;
|
||||||
|
|
||||||
ParsedDocument create(Engine.Create create) throws ElasticsearchException;
|
ParsedDocument create(Engine.Create create) throws ElasticsearchException;
|
||||||
|
|
||||||
Engine.Index prepareIndex(SourceToParse source, long version, VersionType versionType, Engine.Operation.Origin origin) throws ElasticsearchException;
|
Engine.Index prepareIndex(SourceToParse source, long version, VersionType versionType, Engine.Operation.Origin origin, boolean canHaveDuplicates) throws ElasticsearchException;
|
||||||
|
|
||||||
ParsedDocument index(Engine.Index index) throws ElasticsearchException;
|
ParsedDocument index(Engine.Index index) throws ElasticsearchException;
|
||||||
|
|
||||||
|
|
|
@ -367,11 +367,11 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Engine.Create prepareCreate(SourceToParse source, long version, VersionType versionType, Engine.Operation.Origin origin) throws ElasticsearchException {
|
public Engine.Create prepareCreate(SourceToParse source, long version, VersionType versionType, Engine.Operation.Origin origin, boolean canHaveDuplicates, boolean autoGeneratedId) throws ElasticsearchException {
|
||||||
long startTime = System.nanoTime();
|
long startTime = System.nanoTime();
|
||||||
DocumentMapper docMapper = mapperService.documentMapperWithAutoCreate(source.type());
|
DocumentMapper docMapper = mapperService.documentMapperWithAutoCreate(source.type());
|
||||||
ParsedDocument doc = docMapper.parse(source);
|
ParsedDocument doc = docMapper.parse(source);
|
||||||
return new Engine.Create(docMapper, docMapper.uidMapper().term(doc.uid().stringValue()), doc, version, versionType, origin, startTime);
|
return new Engine.Create(docMapper, docMapper.uidMapper().term(doc.uid().stringValue()), doc, version, versionType, origin, startTime, state != IndexShardState.STARTED || canHaveDuplicates, autoGeneratedId);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -388,11 +388,11 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Engine.Index prepareIndex(SourceToParse source, long version, VersionType versionType, Engine.Operation.Origin origin) throws ElasticsearchException {
|
public Engine.Index prepareIndex(SourceToParse source, long version, VersionType versionType, Engine.Operation.Origin origin, boolean canHaveDuplicates) throws ElasticsearchException {
|
||||||
long startTime = System.nanoTime();
|
long startTime = System.nanoTime();
|
||||||
DocumentMapper docMapper = mapperService.documentMapperWithAutoCreate(source.type());
|
DocumentMapper docMapper = mapperService.documentMapperWithAutoCreate(source.type());
|
||||||
ParsedDocument doc = docMapper.parse(source);
|
ParsedDocument doc = docMapper.parse(source);
|
||||||
return new Engine.Index(docMapper, docMapper.uidMapper().term(doc.uid().stringValue()), doc, version, versionType, origin, startTime);
|
return new Engine.Index(docMapper, docMapper.uidMapper().term(doc.uid().stringValue()), doc, version, versionType, origin, startTime, state != IndexShardState.STARTED || canHaveDuplicates);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -747,13 +747,13 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
|
||||||
engine.create(prepareCreate(
|
engine.create(prepareCreate(
|
||||||
source(create.source()).type(create.type()).id(create.id())
|
source(create.source()).type(create.type()).id(create.id())
|
||||||
.routing(create.routing()).parent(create.parent()).timestamp(create.timestamp()).ttl(create.ttl()),
|
.routing(create.routing()).parent(create.parent()).timestamp(create.timestamp()).ttl(create.ttl()),
|
||||||
create.version(), create.versionType().versionTypeForReplicationAndRecovery(), Engine.Operation.Origin.RECOVERY));
|
create.version(), create.versionType().versionTypeForReplicationAndRecovery(), Engine.Operation.Origin.RECOVERY, true, false));
|
||||||
break;
|
break;
|
||||||
case SAVE:
|
case SAVE:
|
||||||
Translog.Index index = (Translog.Index) operation;
|
Translog.Index index = (Translog.Index) operation;
|
||||||
engine.index(prepareIndex(source(index.source()).type(index.type()).id(index.id())
|
engine.index(prepareIndex(source(index.source()).type(index.type()).id(index.id())
|
||||||
.routing(index.routing()).parent(index.parent()).timestamp(index.timestamp()).ttl(index.ttl()),
|
.routing(index.routing()).parent(index.parent()).timestamp(index.timestamp()).ttl(index.ttl()),
|
||||||
index.version(),index.versionType().versionTypeForReplicationAndRecovery(), Engine.Operation.Origin.RECOVERY));
|
index.version(),index.versionType().versionTypeForReplicationAndRecovery(), Engine.Operation.Origin.RECOVERY, true));
|
||||||
break;
|
break;
|
||||||
case DELETE:
|
case DELETE:
|
||||||
Translog.Delete delete = (Translog.Delete) operation;
|
Translog.Delete delete = (Translog.Delete) operation;
|
||||||
|
|
|
@ -50,6 +50,7 @@ public class SingleThreadBulkStress {
|
||||||
|
|
||||||
int shardsCount = Integer.parseInt(System.getProperty("es.shards", "1"));
|
int shardsCount = Integer.parseInt(System.getProperty("es.shards", "1"));
|
||||||
int replicaCount = Integer.parseInt(System.getProperty("es.replica", "1"));
|
int replicaCount = Integer.parseInt(System.getProperty("es.replica", "1"));
|
||||||
|
boolean autoGenerateId = true;
|
||||||
|
|
||||||
Settings settings = settingsBuilder()
|
Settings settings = settingsBuilder()
|
||||||
.put("index.refresh_interval", "1s")
|
.put("index.refresh_interval", "1s")
|
||||||
|
@ -94,7 +95,7 @@ public class SingleThreadBulkStress {
|
||||||
BulkRequestBuilder request = client1.prepareBulk();
|
BulkRequestBuilder request = client1.prepareBulk();
|
||||||
for (int j = 0; j < BATCH; j++) {
|
for (int j = 0; j < BATCH; j++) {
|
||||||
counter++;
|
counter++;
|
||||||
request.add(Requests.indexRequest("test").type("type1").id(Integer.toString(counter)).source(source(Integer.toString(counter), "test" + counter)));
|
request.add(Requests.indexRequest("test").type("type1").id(autoGenerateId ? null : Integer.toString(counter)).source(source(Integer.toString(counter), "test" + counter)));
|
||||||
}
|
}
|
||||||
BulkResponse response = request.execute().actionGet();
|
BulkResponse response = request.execute().actionGet();
|
||||||
if (response.hasFailures()) {
|
if (response.hasFailures()) {
|
||||||
|
|
Loading…
Reference in New Issue