Internal: Remove the disabled autogenerated id optimization from InternalEngine
If a document is indexed into ES with no id, ES will generate one for it. We used to have an optimization for this case where the engine will not try to resolve the ids of these request in the existing index but immediately try to index them. This optimization has proven to be the source of brittle bugs (solved!) and we disabled it in 1.5, preparing for it to be removed if no performance degradation was found. Since we haven't seen any such degradation we can remove it. Along with the removal of the optmization, we can remove the autogenerate id flag on indexing requests and the can have duplicate flag. The only downside of the removal of the canHaveDuplicate flag is that we can't make sure any more that when we retry an autogenerated id create operation we will ignore any document already exists exception (See #9125 for background and discussion). To work around this, we don't set the operation to CREATE any more when we generate an id, so the resulting request will never fail when it finds an existing doc but do return a version of 2. I think that's acceptable. Closes #13857
This commit is contained in:
parent
f526754d0e
commit
169d06cf9e
|
@ -462,12 +462,12 @@ public class TransportShardBulkAction extends TransportReplicationAction<BulkSha
|
|||
|
||||
final Engine.IndexingOperation operation;
|
||||
if (indexRequest.opType() == IndexRequest.OpType.INDEX) {
|
||||
operation = indexShard.prepareIndex(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.REPLICA, request.canHaveDuplicates() || indexRequest.canHaveDuplicates());
|
||||
operation = indexShard.prepareIndex(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.REPLICA);
|
||||
} else {
|
||||
assert indexRequest.opType() == IndexRequest.OpType.CREATE : indexRequest.opType();
|
||||
operation = indexShard.prepareCreate(sourceToParse,
|
||||
indexRequest.version(), indexRequest.versionType(),
|
||||
Engine.Operation.Origin.REPLICA, request.canHaveDuplicates() || indexRequest.canHaveDuplicates(), indexRequest.autoGeneratedId());
|
||||
Engine.Operation.Origin.REPLICA);
|
||||
}
|
||||
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
|
||||
if (update != null) {
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
|
||||
package org.elasticsearch.action.index;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import org.elasticsearch.ElasticsearchGenerationException;
|
||||
import org.elasticsearch.ElasticsearchParseException;
|
||||
import org.elasticsearch.Version;
|
||||
|
@ -41,6 +40,7 @@ import org.elasticsearch.index.mapper.MapperParsingException;
|
|||
import org.elasticsearch.index.mapper.internal.TimestampFieldMapper;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
|
||||
|
@ -139,7 +139,6 @@ public class IndexRequest extends ReplicationRequest<IndexRequest> implements Do
|
|||
private BytesReference source;
|
||||
|
||||
private OpType opType = OpType.INDEX;
|
||||
private boolean autoGeneratedId = false;
|
||||
|
||||
private boolean refresh = false;
|
||||
private long version = Versions.MATCH_ANY;
|
||||
|
@ -172,7 +171,6 @@ public class IndexRequest extends ReplicationRequest<IndexRequest> implements Do
|
|||
this.ttl = indexRequest.ttl;
|
||||
this.source = indexRequest.source;
|
||||
this.opType = indexRequest.opType;
|
||||
this.autoGeneratedId = indexRequest.autoGeneratedId;
|
||||
this.refresh = indexRequest.refresh;
|
||||
this.version = indexRequest.version;
|
||||
this.versionType = indexRequest.versionType;
|
||||
|
@ -551,13 +549,6 @@ public class IndexRequest extends ReplicationRequest<IndexRequest> implements Do
|
|||
return this.versionType;
|
||||
}
|
||||
|
||||
/**
|
||||
* Has the id been auto generated?
|
||||
*/
|
||||
public boolean autoGeneratedId() {
|
||||
return this.autoGeneratedId;
|
||||
}
|
||||
|
||||
public void process(MetaData metaData, @Nullable MappingMetaData mappingMd, boolean allowIdGeneration, String concreteIndex) {
|
||||
// resolve the routing if needed
|
||||
routing(metaData.resolveIndexRouting(routing, index));
|
||||
|
@ -622,9 +613,6 @@ public class IndexRequest extends ReplicationRequest<IndexRequest> implements Do
|
|||
if (allowIdGeneration) {
|
||||
if (id == null) {
|
||||
id(Strings.base64UUID());
|
||||
// since we generate the id, change it to CREATE
|
||||
opType(IndexRequest.OpType.CREATE);
|
||||
autoGeneratedId = true;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -663,7 +651,6 @@ public class IndexRequest extends ReplicationRequest<IndexRequest> implements Do
|
|||
refresh = in.readBoolean();
|
||||
version = in.readLong();
|
||||
versionType = VersionType.fromValue(in.readByte());
|
||||
autoGeneratedId = in.readBoolean();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -680,7 +667,6 @@ public class IndexRequest extends ReplicationRequest<IndexRequest> implements Do
|
|||
out.writeBoolean(refresh);
|
||||
out.writeLong(version);
|
||||
out.writeByte(versionType.getValue());
|
||||
out.writeBoolean(autoGeneratedId);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -42,7 +42,6 @@ import org.elasticsearch.common.inject.Inject;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.IndexService;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.engine.EngineClosedException;
|
||||
import org.elasticsearch.index.mapper.Mapping;
|
||||
import org.elasticsearch.index.mapper.SourceToParse;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
|
@ -183,10 +182,10 @@ public class TransportIndexAction extends TransportReplicationAction<IndexReques
|
|||
|
||||
final Engine.IndexingOperation operation;
|
||||
if (request.opType() == IndexRequest.OpType.INDEX) {
|
||||
operation = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.REPLICA, request.canHaveDuplicates());
|
||||
operation = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.REPLICA);
|
||||
} else {
|
||||
assert request.opType() == IndexRequest.OpType.CREATE : request.opType();
|
||||
operation = indexShard.prepareCreate(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.REPLICA, request.canHaveDuplicates(), request.autoGeneratedId());
|
||||
operation = indexShard.prepareCreate(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.REPLICA);
|
||||
}
|
||||
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
|
||||
if (update != null) {
|
||||
|
|
|
@ -48,7 +48,6 @@ public class ReplicationRequest<T extends ReplicationRequest> extends ActionRequ
|
|||
protected String index;
|
||||
|
||||
private WriteConsistencyLevel consistencyLevel = WriteConsistencyLevel.DEFAULT;
|
||||
private volatile boolean canHaveDuplicates = false;
|
||||
|
||||
public ReplicationRequest() {
|
||||
|
||||
|
@ -79,17 +78,6 @@ public class ReplicationRequest<T extends ReplicationRequest> extends ActionRequ
|
|||
this.consistencyLevel = request.consistencyLevel();
|
||||
}
|
||||
|
||||
void setCanHaveDuplicates() {
|
||||
this.canHaveDuplicates = true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Is this request can potentially be dup on a single shard.
|
||||
*/
|
||||
public boolean canHaveDuplicates() {
|
||||
return canHaveDuplicates;
|
||||
}
|
||||
|
||||
/**
|
||||
* A timeout to wait if the index operation can't be performed immediately. Defaults to <tt>1m</tt>.
|
||||
*/
|
||||
|
@ -171,8 +159,6 @@ public class ReplicationRequest<T extends ReplicationRequest> extends ActionRequ
|
|||
consistencyLevel = WriteConsistencyLevel.fromId(in.readByte());
|
||||
timeout = TimeValue.readTimeValue(in);
|
||||
index = in.readString();
|
||||
canHaveDuplicates = in.readBoolean();
|
||||
// no need to serialize threaded* parameters, since they only matter locally
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -182,7 +168,6 @@ public class ReplicationRequest<T extends ReplicationRequest> extends ActionRequ
|
|||
out.writeByte(consistencyLevel.id());
|
||||
timeout.writeTo(out);
|
||||
out.writeString(index);
|
||||
out.writeBoolean(canHaveDuplicates);
|
||||
}
|
||||
|
||||
public T setShardId(ShardId shardId) {
|
||||
|
|
|
@ -42,7 +42,10 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
|||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.routing.*;
|
||||
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.ShardIterator;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
|
@ -481,7 +484,6 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
|
|||
// if we got disconnected from the node, or the node / shard is not in the right state (being closed)
|
||||
if (exp.unwrapCause() instanceof ConnectTransportException || exp.unwrapCause() instanceof NodeClosedException ||
|
||||
retryPrimaryException(exp)) {
|
||||
internalRequest.request().setCanHaveDuplicates();
|
||||
// we already marked it as started when we executed it (removed the listener) so pass false
|
||||
// to re-add to the cluster listener
|
||||
logger.trace("received an error from node the primary was assigned to ({}), scheduling a retry", exp.getMessage());
|
||||
|
@ -581,7 +583,6 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
|
|||
logger.trace("operation completed on primary [{}]", primary);
|
||||
replicationPhase = new ReplicationPhase(shardsIt, primaryResponse.v2(), primaryResponse.v1(), observer, primary, internalRequest, listener, indexShardReference);
|
||||
} catch (Throwable e) {
|
||||
internalRequest.request.setCanHaveDuplicates();
|
||||
// shard has not been allocated yet, retry it here
|
||||
if (retryPrimaryException(e)) {
|
||||
logger.trace("had an error while performing operation on primary ({}), scheduling a retry.", e.getMessage());
|
||||
|
@ -762,14 +763,10 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
|
|||
numberOfPendingShardInstances++;
|
||||
}
|
||||
}
|
||||
internalRequest.request().setCanHaveDuplicates(); // safe side, cluster state changed, we might have dups
|
||||
} else {
|
||||
shardIt = originalShardIt;
|
||||
shardIt.reset();
|
||||
while ((shard = shardIt.nextOrNull()) != null) {
|
||||
if (shard.state() != ShardRoutingState.STARTED) {
|
||||
replicaRequest.setCanHaveDuplicates();
|
||||
}
|
||||
if (shard.unassigned()) {
|
||||
numberOfUnassignedOrIgnoredReplicas++;
|
||||
} else if (shard.primary()) {
|
||||
|
@ -1042,16 +1039,12 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
|
|||
private final Engine.IndexingOperation prepareIndexOperationOnPrimary(BulkShardRequest shardRequest, IndexRequest request, IndexShard indexShard) {
|
||||
SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.PRIMARY, request.source()).index(request.index()).type(request.type()).id(request.id())
|
||||
.routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());
|
||||
boolean canHaveDuplicates = request.canHaveDuplicates();
|
||||
if (shardRequest != null) {
|
||||
canHaveDuplicates |= shardRequest.canHaveDuplicates();
|
||||
}
|
||||
if (request.opType() == IndexRequest.OpType.INDEX) {
|
||||
return indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, canHaveDuplicates);
|
||||
return indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY);
|
||||
} else {
|
||||
assert request.opType() == IndexRequest.OpType.CREATE : request.opType();
|
||||
return indexShard.prepareCreate(sourceToParse,
|
||||
request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, canHaveDuplicates, canHaveDuplicates);
|
||||
request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -52,12 +52,7 @@ import org.elasticsearch.index.translog.Translog;
|
|||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.locks.Condition;
|
||||
|
@ -632,24 +627,22 @@ public abstract class Engine implements Closeable {
|
|||
private long version;
|
||||
private final VersionType versionType;
|
||||
private final Origin origin;
|
||||
private final boolean canHaveDuplicates;
|
||||
private Translog.Location location;
|
||||
|
||||
private final long startTime;
|
||||
private long endTime;
|
||||
|
||||
public IndexingOperation(Term uid, ParsedDocument doc, long version, VersionType versionType, Origin origin, long startTime, boolean canHaveDuplicates) {
|
||||
public IndexingOperation(Term uid, ParsedDocument doc, long version, VersionType versionType, Origin origin, long startTime) {
|
||||
this.uid = uid;
|
||||
this.doc = doc;
|
||||
this.version = version;
|
||||
this.versionType = versionType;
|
||||
this.origin = origin;
|
||||
this.startTime = startTime;
|
||||
this.canHaveDuplicates = canHaveDuplicates;
|
||||
}
|
||||
|
||||
public IndexingOperation(Term uid, ParsedDocument doc) {
|
||||
this(uid, doc, Versions.MATCH_ANY, VersionType.INTERNAL, Origin.PRIMARY, System.nanoTime(), true);
|
||||
this(uid, doc, Versions.MATCH_ANY, VersionType.INTERNAL, Origin.PRIMARY, System.nanoTime());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -706,10 +699,6 @@ public abstract class Engine implements Closeable {
|
|||
return this.versionType;
|
||||
}
|
||||
|
||||
public boolean canHaveDuplicates() {
|
||||
return this.canHaveDuplicates;
|
||||
}
|
||||
|
||||
public String parent() {
|
||||
return this.doc.parent();
|
||||
}
|
||||
|
@ -748,20 +737,13 @@ public abstract class Engine implements Closeable {
|
|||
}
|
||||
|
||||
public static final class Create extends IndexingOperation {
|
||||
private final boolean autoGeneratedId;
|
||||
|
||||
public Create(Term uid, ParsedDocument doc, long version, VersionType versionType, Origin origin, long startTime, boolean canHaveDuplicates, boolean autoGeneratedId) {
|
||||
super(uid, doc, version, versionType, origin, startTime, canHaveDuplicates);
|
||||
this.autoGeneratedId = autoGeneratedId;
|
||||
}
|
||||
|
||||
public Create(Term uid, ParsedDocument doc, long version, VersionType versionType, Origin origin, long startTime) {
|
||||
this(uid, doc, version, versionType, origin, startTime, true, false);
|
||||
super(uid, doc, version, versionType, origin, startTime);
|
||||
}
|
||||
|
||||
public Create(Term uid, ParsedDocument doc) {
|
||||
super(uid, doc);
|
||||
autoGeneratedId = false;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -769,11 +751,6 @@ public abstract class Engine implements Closeable {
|
|||
return Type.CREATE;
|
||||
}
|
||||
|
||||
|
||||
public boolean autoGeneratedId() {
|
||||
return this.autoGeneratedId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean execute(IndexShard shard) {
|
||||
shard.create(this);
|
||||
|
@ -783,12 +760,8 @@ public abstract class Engine implements Closeable {
|
|||
|
||||
public static final class Index extends IndexingOperation {
|
||||
|
||||
public Index(Term uid, ParsedDocument doc, long version, VersionType versionType, Origin origin, long startTime, boolean canHaveDuplicates) {
|
||||
super(uid, doc, version, versionType, origin, startTime, canHaveDuplicates);
|
||||
}
|
||||
|
||||
public Index(Term uid, ParsedDocument doc, long version, VersionType versionType, Origin origin, long startTime) {
|
||||
super(uid, doc, version, versionType, origin, startTime, true);
|
||||
super(uid, doc, version, versionType, origin, startTime);
|
||||
}
|
||||
|
||||
public Index(Term uid, ParsedDocument doc) {
|
||||
|
|
|
@ -58,7 +58,6 @@ public final class EngineConfig {
|
|||
private long gcDeletesInMillis = DEFAULT_GC_DELETES.millis();
|
||||
private volatile boolean enableGcDeletes = true;
|
||||
private final String codecName;
|
||||
private final boolean optimizeAutoGenerateId;
|
||||
private final ThreadPool threadPool;
|
||||
private final ShardIndexingService indexingService;
|
||||
@Nullable
|
||||
|
@ -81,12 +80,6 @@ public final class EngineConfig {
|
|||
*/
|
||||
public static final String INDEX_COMPOUND_ON_FLUSH = "index.compound_on_flush";
|
||||
|
||||
/**
|
||||
* Setting to control auto generated ID optimizations. Default is <code>true</code> if not present.
|
||||
* This setting is <b>not</b> realtime updateable.
|
||||
*/
|
||||
public static final String INDEX_OPTIMIZE_AUTOGENERATED_ID_SETTING = "index.optimize_auto_generated_id";
|
||||
|
||||
/**
|
||||
* Index setting to enable / disable deletes garbage collection.
|
||||
* This setting is realtime updateable
|
||||
|
@ -143,7 +136,6 @@ public final class EngineConfig {
|
|||
this.codecService = codecService;
|
||||
this.failedEngineListener = failedEngineListener;
|
||||
this.wrappingService = wrappingService;
|
||||
this.optimizeAutoGenerateId = indexSettings.getAsBoolean(EngineConfig.INDEX_OPTIMIZE_AUTOGENERATED_ID_SETTING, false);
|
||||
this.compoundOnFlush = indexSettings.getAsBoolean(EngineConfig.INDEX_COMPOUND_ON_FLUSH, compoundOnFlush);
|
||||
codecName = indexSettings.get(EngineConfig.INDEX_CODEC_SETTING, EngineConfig.DEFAULT_CODEC_NAME);
|
||||
indexingBufferSize = DEFAULT_INDEX_BUFFER_SIZE;
|
||||
|
@ -255,14 +247,6 @@ public final class EngineConfig {
|
|||
return codecService.codec(codecName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns <code>true</code> iff documents with auto-generated IDs are optimized if possible. This mainly means that
|
||||
* they are simply appended to the index if no update call is necessary.
|
||||
*/
|
||||
public boolean isOptimizeAutoGenerateId() {
|
||||
return optimizeAutoGenerateId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a thread-pool mainly used to get estimated time stamps from {@link org.elasticsearch.threadpool.ThreadPool#estimatedTimeInMillis()} and to schedule
|
||||
* async force merge calls on the {@link org.elasticsearch.threadpool.ThreadPool.Names#OPTIMIZE} thread-pool
|
||||
|
|
|
@ -22,11 +22,7 @@ package org.elasticsearch.index.engine;
|
|||
import org.apache.lucene.index.*;
|
||||
import org.apache.lucene.index.IndexWriter.IndexReaderWarmer;
|
||||
import org.apache.lucene.search.BooleanClause.Occur;
|
||||
import org.apache.lucene.search.BooleanQuery;
|
||||
import org.apache.lucene.search.IndexSearcher;
|
||||
import org.apache.lucene.search.Query;
|
||||
import org.apache.lucene.search.SearcherFactory;
|
||||
import org.apache.lucene.search.SearcherManager;
|
||||
import org.apache.lucene.search.*;
|
||||
import org.apache.lucene.store.AlreadyClosedException;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.store.LockObtainFailedException;
|
||||
|
@ -65,12 +61,7 @@ import org.elasticsearch.rest.RestStatus;
|
|||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
|
@ -247,7 +238,7 @@ public class InternalEngine extends Engine {
|
|||
logger.trace("flushing post recovery from translog. ops recovered [{}]. committed translog id [{}]. current id [{}]",
|
||||
opsRecovered, translogGeneration == null ? null : translogGeneration.translogFileGeneration, translog.currentFileGeneration());
|
||||
flush(true, true);
|
||||
} else if (translog.isCurrent(translogGeneration) == false){
|
||||
} else if (translog.isCurrent(translogGeneration) == false) {
|
||||
commitIndexWriter(indexWriter, translog, lastCommittedSegmentInfos.getUserData().get(Engine.SYNC_COMMIT_ID));
|
||||
}
|
||||
}
|
||||
|
@ -357,29 +348,24 @@ public class InternalEngine extends Engine {
|
|||
}
|
||||
|
||||
private void innerCreate(Create create) throws IOException {
|
||||
if (engineConfig.isOptimizeAutoGenerateId() && create.autoGeneratedId() && !create.canHaveDuplicates()) {
|
||||
// We don't need to lock because this ID cannot be concurrently updated:
|
||||
innerCreateNoLock(create, Versions.NOT_FOUND, null);
|
||||
} else {
|
||||
synchronized (dirtyLock(create.uid())) {
|
||||
final long currentVersion;
|
||||
final VersionValue versionValue;
|
||||
versionValue = versionMap.getUnderLock(create.uid().bytes());
|
||||
if (versionValue == null) {
|
||||
currentVersion = loadCurrentVersionFromIndex(create.uid());
|
||||
synchronized (dirtyLock(create.uid())) {
|
||||
final long currentVersion;
|
||||
final VersionValue versionValue;
|
||||
versionValue = versionMap.getUnderLock(create.uid().bytes());
|
||||
if (versionValue == null) {
|
||||
currentVersion = loadCurrentVersionFromIndex(create.uid());
|
||||
} else {
|
||||
if (engineConfig.isEnableGcDeletes() && versionValue.delete() && (engineConfig.getThreadPool().estimatedTimeInMillis() - versionValue.time()) > engineConfig.getGcDeletesInMillis()) {
|
||||
currentVersion = Versions.NOT_FOUND; // deleted, and GC
|
||||
} else {
|
||||
if (engineConfig.isEnableGcDeletes() && versionValue.delete() && (engineConfig.getThreadPool().estimatedTimeInMillis() - versionValue.time()) > engineConfig.getGcDeletesInMillis()) {
|
||||
currentVersion = Versions.NOT_FOUND; // deleted, and GC
|
||||
} else {
|
||||
currentVersion = versionValue.version();
|
||||
}
|
||||
currentVersion = versionValue.version();
|
||||
}
|
||||
innerCreateNoLock(create, currentVersion, versionValue);
|
||||
}
|
||||
innerCreateUnderLock(create, currentVersion, versionValue);
|
||||
}
|
||||
}
|
||||
|
||||
private void innerCreateNoLock(Create create, long currentVersion, VersionValue versionValue) throws IOException {
|
||||
private void innerCreateUnderLock(Create create, long currentVersion, VersionValue versionValue) throws IOException {
|
||||
|
||||
// same logic as index
|
||||
long updatedVersion;
|
||||
|
@ -402,17 +388,6 @@ public class InternalEngine extends Engine {
|
|||
// #7142: the primary already determined it's OK to index this document, and we confirmed above that the version doesn't
|
||||
// conflict, so we must also update here on the replica to remain consistent:
|
||||
doUpdate = true;
|
||||
} else if (create.origin() == Operation.Origin.PRIMARY && create.autoGeneratedId() && create.canHaveDuplicates() && currentVersion == 1 && create.version() == Versions.MATCH_ANY) {
|
||||
/**
|
||||
* If bulk index request fails due to a disconnect, unavailable shard etc. then the request is
|
||||
* retried before it actually fails. However, the documents might already be indexed.
|
||||
* For autogenerated ids this means that a version conflict will be reported in the bulk request
|
||||
* although the document was indexed properly.
|
||||
* To avoid this we have to make sure that the index request is treated as an update and set updatedVersion to 1.
|
||||
* See also discussion on https://github.com/elasticsearch/elasticsearch/pull/9125
|
||||
*/
|
||||
doUpdate = true;
|
||||
updatedVersion = 1;
|
||||
} else {
|
||||
// On primary, we throw DAEE if the _uid is already in the index with an older version:
|
||||
assert create.origin() == Operation.Origin.PRIMARY;
|
||||
|
@ -636,9 +611,9 @@ public class InternalEngine extends Engine {
|
|||
Query query = delete.query();
|
||||
if (delete.aliasFilter() != null) {
|
||||
query = new BooleanQuery.Builder()
|
||||
.add(query, Occur.MUST)
|
||||
.add(delete.aliasFilter(), Occur.FILTER)
|
||||
.build();
|
||||
.add(query, Occur.MUST)
|
||||
.add(delete.aliasFilter(), Occur.FILTER)
|
||||
.build();
|
||||
}
|
||||
if (delete.nested()) {
|
||||
query = new IncludeNestedDocsQuery(query, delete.parentFilter());
|
||||
|
@ -1071,6 +1046,7 @@ public class InternalEngine extends Engine {
|
|||
throw ex;
|
||||
}
|
||||
}
|
||||
|
||||
/** Extended SearcherFactory that warms the segments if needed when acquiring a new searcher */
|
||||
final static class SearchFactory extends EngineSearcherFactory {
|
||||
private final IndicesWarmer warmer;
|
||||
|
@ -1192,6 +1168,7 @@ public class InternalEngine extends Engine {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void handleMergeException(final Directory dir, final Throwable exc) {
|
||||
logger.error("failed to merge", exc);
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
|
||||
package org.elasticsearch.index.shard;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import org.apache.lucene.codecs.PostingsFormat;
|
||||
import org.apache.lucene.index.CheckIndex;
|
||||
import org.apache.lucene.index.IndexCommit;
|
||||
|
@ -113,6 +112,7 @@ import org.elasticsearch.threadpool.ThreadPool;
|
|||
import java.io.IOException;
|
||||
import java.io.PrintStream;
|
||||
import java.nio.channels.ClosedByInterruptException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
|
@ -428,22 +428,22 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
|
|||
return previousState;
|
||||
}
|
||||
|
||||
public Engine.Create prepareCreate(SourceToParse source, long version, VersionType versionType, Engine.Operation.Origin origin, boolean canHaveDuplicates, boolean autoGeneratedId) {
|
||||
public Engine.Create prepareCreate(SourceToParse source, long version, VersionType versionType, Engine.Operation.Origin origin) {
|
||||
try {
|
||||
return prepareCreate(docMapper(source.type()), source, version, versionType, origin, state != IndexShardState.STARTED || canHaveDuplicates, autoGeneratedId);
|
||||
return prepareCreate(docMapper(source.type()), source, version, versionType, origin);
|
||||
} catch (Throwable t) {
|
||||
verifyNotClosed(t);
|
||||
throw t;
|
||||
}
|
||||
}
|
||||
|
||||
static Engine.Create prepareCreate(DocumentMapperForType docMapper, SourceToParse source, long version, VersionType versionType, Engine.Operation.Origin origin, boolean canHaveDuplicates, boolean autoGeneratedId) {
|
||||
static Engine.Create prepareCreate(DocumentMapperForType docMapper, SourceToParse source, long version, VersionType versionType, Engine.Operation.Origin origin) {
|
||||
long startTime = System.nanoTime();
|
||||
ParsedDocument doc = docMapper.getDocumentMapper().parse(source);
|
||||
if (docMapper.getMapping() != null) {
|
||||
doc.addDynamicMappingsUpdate(docMapper.getMapping());
|
||||
}
|
||||
return new Engine.Create(docMapper.getDocumentMapper().uidMapper().term(doc.uid().stringValue()), doc, version, versionType, origin, startTime, canHaveDuplicates, autoGeneratedId);
|
||||
return new Engine.Create(docMapper.getDocumentMapper().uidMapper().term(doc.uid().stringValue()), doc, version, versionType, origin, startTime);
|
||||
}
|
||||
|
||||
public void create(Engine.Create create) {
|
||||
|
@ -462,22 +462,22 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
|
|||
indexingService.postCreate(create);
|
||||
}
|
||||
|
||||
public Engine.Index prepareIndex(SourceToParse source, long version, VersionType versionType, Engine.Operation.Origin origin, boolean canHaveDuplicates) {
|
||||
public Engine.Index prepareIndex(SourceToParse source, long version, VersionType versionType, Engine.Operation.Origin origin) {
|
||||
try {
|
||||
return prepareIndex(docMapper(source.type()), source, version, versionType, origin, state != IndexShardState.STARTED || canHaveDuplicates);
|
||||
return prepareIndex(docMapper(source.type()), source, version, versionType, origin);
|
||||
} catch (Throwable t) {
|
||||
verifyNotClosed(t);
|
||||
throw t;
|
||||
}
|
||||
}
|
||||
|
||||
static Engine.Index prepareIndex(DocumentMapperForType docMapper, SourceToParse source, long version, VersionType versionType, Engine.Operation.Origin origin, boolean canHaveDuplicates) {
|
||||
static Engine.Index prepareIndex(DocumentMapperForType docMapper, SourceToParse source, long version, VersionType versionType, Engine.Operation.Origin origin) {
|
||||
long startTime = System.nanoTime();
|
||||
ParsedDocument doc = docMapper.getDocumentMapper().parse(source);
|
||||
if (docMapper.getMapping() != null) {
|
||||
doc.addDynamicMappingsUpdate(docMapper.getMapping());
|
||||
}
|
||||
return new Engine.Index(docMapper.getDocumentMapper().uidMapper().term(doc.uid().stringValue()), doc, version, versionType, origin, startTime, canHaveDuplicates);
|
||||
return new Engine.Index(docMapper.getDocumentMapper().uidMapper().term(doc.uid().stringValue()), doc, version, versionType, origin, startTime);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -23,6 +23,7 @@ import org.apache.lucene.search.join.BitSetProducer;
|
|||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.ParsingException;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
|
@ -38,7 +39,6 @@ import org.elasticsearch.index.engine.IgnoreOnRecoveryEngineException;
|
|||
import org.elasticsearch.index.mapper.*;
|
||||
import org.elasticsearch.index.query.IndexQueryParserService;
|
||||
import org.elasticsearch.index.query.ParsedQuery;
|
||||
import org.elasticsearch.common.ParsingException;
|
||||
import org.elasticsearch.index.translog.Translog;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -150,7 +150,7 @@ public class TranslogRecoveryPerformer {
|
|||
Engine.Create engineCreate = IndexShard.prepareCreate(docMapper(create.type()),
|
||||
source(create.source()).index(shardId.getIndex()).type(create.type()).id(create.id())
|
||||
.routing(create.routing()).parent(create.parent()).timestamp(create.timestamp()).ttl(create.ttl()),
|
||||
create.version(), create.versionType().versionTypeForReplicationAndRecovery(), Engine.Operation.Origin.RECOVERY, true, false);
|
||||
create.version(), create.versionType().versionTypeForReplicationAndRecovery(), Engine.Operation.Origin.RECOVERY);
|
||||
maybeAddMappingUpdate(engineCreate.type(), engineCreate.parsedDoc().dynamicMappingsUpdate(), engineCreate.id(), allowMappingUpdates);
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("[translog] recover [create] op of [{}][{}]", create.type(), create.id());
|
||||
|
@ -161,7 +161,7 @@ public class TranslogRecoveryPerformer {
|
|||
Translog.Index index = (Translog.Index) operation;
|
||||
Engine.Index engineIndex = IndexShard.prepareIndex(docMapper(index.type()), source(index.source()).type(index.type()).id(index.id())
|
||||
.routing(index.routing()).parent(index.parent()).timestamp(index.timestamp()).ttl(index.ttl()),
|
||||
index.version(), index.versionType().versionTypeForReplicationAndRecovery(), Engine.Operation.Origin.RECOVERY, true);
|
||||
index.version(), index.versionType().versionTypeForReplicationAndRecovery(), Engine.Operation.Origin.RECOVERY);
|
||||
maybeAddMappingUpdate(engineIndex.type(), engineIndex.parsedDoc().dynamicMappingsUpdate(), engineIndex.id(), allowMappingUpdates);
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("[translog] recover [index] op of [{}][{}]", index.type(), index.id());
|
||||
|
|
|
@ -210,7 +210,7 @@ public class DocumentActionsIT extends ESIntegTestCase {
|
|||
assertThat(bulkResponse.getItems()[1].getId(), equalTo("2"));
|
||||
|
||||
assertThat(bulkResponse.getItems()[2].isFailed(), equalTo(false));
|
||||
assertThat(bulkResponse.getItems()[2].getOpType(), equalTo("create"));
|
||||
assertThat(bulkResponse.getItems()[2].getOpType(), equalTo("index"));
|
||||
assertThat(bulkResponse.getItems()[2].getIndex(), equalTo(getConcreteIndexName()));
|
||||
assertThat(bulkResponse.getItems()[2].getType(), equalTo("type1"));
|
||||
String generatedId3 = bulkResponse.getItems()[2].getId();
|
||||
|
@ -222,7 +222,7 @@ public class DocumentActionsIT extends ESIntegTestCase {
|
|||
assertThat(bulkResponse.getItems()[3].getId(), equalTo("1"));
|
||||
|
||||
assertThat(bulkResponse.getItems()[4].isFailed(), equalTo(true));
|
||||
assertThat(bulkResponse.getItems()[4].getOpType(), equalTo("create"));
|
||||
assertThat(bulkResponse.getItems()[4].getOpType(), equalTo("index"));
|
||||
assertThat(bulkResponse.getItems()[4].getIndex(), equalTo(getConcreteIndexName()));
|
||||
assertThat(bulkResponse.getItems()[4].getType(), equalTo("type1"));
|
||||
|
||||
|
|
|
@ -20,7 +20,6 @@
|
|||
package org.elasticsearch.index.engine;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
|
||||
import org.apache.log4j.AppenderSkeleton;
|
||||
import org.apache.log4j.Level;
|
||||
import org.apache.log4j.LogManager;
|
||||
|
@ -1431,7 +1430,7 @@ public class InternalEngineTests extends ESTestCase {
|
|||
document.add(new TextField("value", "test1", Field.Store.YES));
|
||||
|
||||
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, document, B_2, null);
|
||||
engine.index(new Engine.Index(newUid("1"), doc, 1, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), false));
|
||||
engine.index(new Engine.Index(newUid("1"), doc, 1, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime()));
|
||||
|
||||
// Delete document we just added:
|
||||
engine.delete(new Engine.Delete("test", "1", newUid("1"), 10, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), false));
|
||||
|
@ -1547,89 +1546,6 @@ public class InternalEngineTests extends ESTestCase {
|
|||
assertEquals(currentIndexWriterConfig.getCodec().getName(), codecService.codec(codecName).getName());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRetryWithAutogeneratedIdWorksAndNoDuplicateDocs() throws IOException {
|
||||
|
||||
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, null);
|
||||
boolean canHaveDuplicates = false;
|
||||
boolean autoGeneratedId = true;
|
||||
|
||||
Engine.Create index = new Engine.Create(newUid("1"), doc, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime(), canHaveDuplicates, autoGeneratedId);
|
||||
engine.create(index);
|
||||
assertThat(index.version(), equalTo(1l));
|
||||
|
||||
index = new Engine.Create(newUid("1"), doc, index.version(), index.versionType().versionTypeForReplicationAndRecovery(), REPLICA, System.nanoTime(), canHaveDuplicates, autoGeneratedId);
|
||||
replicaEngine.create(index);
|
||||
assertThat(index.version(), equalTo(1l));
|
||||
|
||||
canHaveDuplicates = true;
|
||||
index = new Engine.Create(newUid("1"), doc, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime(), canHaveDuplicates, autoGeneratedId);
|
||||
engine.create(index);
|
||||
assertThat(index.version(), equalTo(1l));
|
||||
engine.refresh("test");
|
||||
Engine.Searcher searcher = engine.acquireSearcher("test");
|
||||
TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), 10);
|
||||
assertThat(topDocs.totalHits, equalTo(1));
|
||||
|
||||
index = new Engine.Create(newUid("1"), doc, index.version(), index.versionType().versionTypeForReplicationAndRecovery(), REPLICA, System.nanoTime(), canHaveDuplicates, autoGeneratedId);
|
||||
try {
|
||||
replicaEngine.create(index);
|
||||
fail();
|
||||
} catch (VersionConflictEngineException e) {
|
||||
// we ignore version conflicts on replicas, see TransportReplicationAction.ignoreReplicaException
|
||||
}
|
||||
replicaEngine.refresh("test");
|
||||
Engine.Searcher replicaSearcher = replicaEngine.acquireSearcher("test");
|
||||
topDocs = replicaSearcher.searcher().search(new MatchAllDocsQuery(), 10);
|
||||
assertThat(topDocs.totalHits, equalTo(1));
|
||||
searcher.close();
|
||||
replicaSearcher.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRetryWithAutogeneratedIdsAndWrongOrderWorksAndNoDuplicateDocs() throws IOException {
|
||||
|
||||
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, null);
|
||||
boolean canHaveDuplicates = true;
|
||||
boolean autoGeneratedId = true;
|
||||
|
||||
Engine.Create firstIndexRequest = new Engine.Create(newUid("1"), doc, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime(), canHaveDuplicates, autoGeneratedId);
|
||||
engine.create(firstIndexRequest);
|
||||
assertThat(firstIndexRequest.version(), equalTo(1l));
|
||||
|
||||
Engine.Create firstIndexRequestReplica = new Engine.Create(newUid("1"), doc, firstIndexRequest.version(), firstIndexRequest.versionType().versionTypeForReplicationAndRecovery(), REPLICA, System.nanoTime(), canHaveDuplicates, autoGeneratedId);
|
||||
replicaEngine.create(firstIndexRequestReplica);
|
||||
assertThat(firstIndexRequestReplica.version(), equalTo(1l));
|
||||
|
||||
canHaveDuplicates = false;
|
||||
Engine.Create secondIndexRequest = new Engine.Create(newUid("1"), doc, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime(), canHaveDuplicates, autoGeneratedId);
|
||||
try {
|
||||
engine.create(secondIndexRequest);
|
||||
fail();
|
||||
} catch (DocumentAlreadyExistsException e) {
|
||||
// we can ignore the exception. In case this happens because the retry request arrived first then this error will not be sent back anyway.
|
||||
// in any other case this is an actual error
|
||||
}
|
||||
engine.refresh("test");
|
||||
Engine.Searcher searcher = engine.acquireSearcher("test");
|
||||
TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), 10);
|
||||
assertThat(topDocs.totalHits, equalTo(1));
|
||||
|
||||
Engine.Create secondIndexRequestReplica = new Engine.Create(newUid("1"), doc, firstIndexRequest.version(), firstIndexRequest.versionType().versionTypeForReplicationAndRecovery(), REPLICA, System.nanoTime(), canHaveDuplicates, autoGeneratedId);
|
||||
try {
|
||||
replicaEngine.create(secondIndexRequestReplica);
|
||||
fail();
|
||||
} catch (VersionConflictEngineException e) {
|
||||
// we ignore version conflicts on replicas, see TransportReplicationAction.ignoreReplicaException.
|
||||
}
|
||||
replicaEngine.refresh("test");
|
||||
Engine.Searcher replicaSearcher = replicaEngine.acquireSearcher("test");
|
||||
topDocs = replicaSearcher.searcher().search(new MatchAllDocsQuery(), 10);
|
||||
assertThat(topDocs.totalHits, equalTo(1));
|
||||
searcher.close();
|
||||
replicaSearcher.close();
|
||||
}
|
||||
|
||||
// #10312
|
||||
@Test
|
||||
public void testDeletesAloneCanTriggerRefresh() throws Exception {
|
||||
|
@ -1689,12 +1605,10 @@ public class InternalEngineTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testTranslogReplayWithFailure() throws IOException {
|
||||
boolean canHaveDuplicates = true;
|
||||
boolean autoGeneratedId = true;
|
||||
final int numDocs = randomIntBetween(1, 10);
|
||||
for (int i = 0; i < numDocs; i++) {
|
||||
ParsedDocument doc = testParsedDocument(Integer.toString(i), Integer.toString(i), "test", null, -1, -1, testDocument(), new BytesArray("{}"), null);
|
||||
Engine.Create firstIndexRequest = new Engine.Create(newUid(Integer.toString(i)), doc, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime(), canHaveDuplicates, autoGeneratedId);
|
||||
Engine.Create firstIndexRequest = new Engine.Create(newUid(Integer.toString(i)), doc, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime());
|
||||
engine.create(firstIndexRequest);
|
||||
assertThat(firstIndexRequest.version(), equalTo(1l));
|
||||
}
|
||||
|
@ -1744,12 +1658,10 @@ public class InternalEngineTests extends ESTestCase {
|
|||
|
||||
@Test
|
||||
public void testSkipTranslogReplay() throws IOException {
|
||||
boolean canHaveDuplicates = true;
|
||||
boolean autoGeneratedId = true;
|
||||
final int numDocs = randomIntBetween(1, 10);
|
||||
for (int i = 0; i < numDocs; i++) {
|
||||
ParsedDocument doc = testParsedDocument(Integer.toString(i), Integer.toString(i), "test", null, -1, -1, testDocument(), new BytesArray("{}"), null);
|
||||
Engine.Create firstIndexRequest = new Engine.Create(newUid(Integer.toString(i)), doc, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime(), canHaveDuplicates, autoGeneratedId);
|
||||
Engine.Create firstIndexRequest = new Engine.Create(newUid(Integer.toString(i)), doc, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime());
|
||||
engine.create(firstIndexRequest);
|
||||
assertThat(firstIndexRequest.version(), equalTo(1l));
|
||||
}
|
||||
|
@ -1850,7 +1762,7 @@ public class InternalEngineTests extends ESTestCase {
|
|||
final int numExtraDocs = randomIntBetween(1, 10);
|
||||
for (int i = 0; i < numExtraDocs; i++) {
|
||||
ParsedDocument doc = testParsedDocument("extra" + Integer.toString(i), "extra" + Integer.toString(i), "test", null, -1, -1, testDocument(), new BytesArray("{}"), null);
|
||||
Engine.Create firstIndexRequest = new Engine.Create(newUid(Integer.toString(i)), doc, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime(), false, false);
|
||||
Engine.Create firstIndexRequest = new Engine.Create(newUid(Integer.toString(i)), doc, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime());
|
||||
engine.create(firstIndexRequest);
|
||||
assertThat(firstIndexRequest.version(), equalTo(1l));
|
||||
}
|
||||
|
@ -1876,12 +1788,10 @@ public class InternalEngineTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testTranslogReplay() throws IOException {
|
||||
boolean canHaveDuplicates = true;
|
||||
boolean autoGeneratedId = true;
|
||||
final int numDocs = randomIntBetween(1, 10);
|
||||
for (int i = 0; i < numDocs; i++) {
|
||||
ParsedDocument doc = testParsedDocument(Integer.toString(i), Integer.toString(i), "test", null, -1, -1, testDocument(), new BytesArray("{}"), null);
|
||||
Engine.Create firstIndexRequest = new Engine.Create(newUid(Integer.toString(i)), doc, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime(), canHaveDuplicates, autoGeneratedId);
|
||||
Engine.Create firstIndexRequest = new Engine.Create(newUid(Integer.toString(i)), doc, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime());
|
||||
engine.create(firstIndexRequest);
|
||||
assertThat(firstIndexRequest.version(), equalTo(1l));
|
||||
}
|
||||
|
@ -1930,7 +1840,7 @@ public class InternalEngineTests extends ESTestCase {
|
|||
int randomId = randomIntBetween(numDocs + 1, numDocs + 10);
|
||||
String uuidValue = "test#" + Integer.toString(randomId);
|
||||
ParsedDocument doc = testParsedDocument(uuidValue, Integer.toString(randomId), "test", null, -1, -1, testDocument(), new BytesArray("{}"), null);
|
||||
Engine.Create firstIndexRequest = new Engine.Create(newUid(uuidValue), doc, 1, VersionType.EXTERNAL, PRIMARY, System.nanoTime(), canHaveDuplicates, autoGeneratedId);
|
||||
Engine.Create firstIndexRequest = new Engine.Create(newUid(uuidValue), doc, 1, VersionType.EXTERNAL, PRIMARY, System.nanoTime());
|
||||
engine.create(firstIndexRequest);
|
||||
assertThat(firstIndexRequest.version(), equalTo(1l));
|
||||
if (flush) {
|
||||
|
@ -2008,12 +1918,10 @@ public class InternalEngineTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testRecoverFromForeignTranslog() throws IOException {
|
||||
boolean canHaveDuplicates = true;
|
||||
boolean autoGeneratedId = true;
|
||||
final int numDocs = randomIntBetween(1, 10);
|
||||
for (int i = 0; i < numDocs; i++) {
|
||||
ParsedDocument doc = testParsedDocument(Integer.toString(i), Integer.toString(i), "test", null, -1, -1, testDocument(), new BytesArray("{}"), null);
|
||||
Engine.Create firstIndexRequest = new Engine.Create(newUid(Integer.toString(i)), doc, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime(), canHaveDuplicates, autoGeneratedId);
|
||||
Engine.Create firstIndexRequest = new Engine.Create(newUid(Integer.toString(i)), doc, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime());
|
||||
engine.create(firstIndexRequest);
|
||||
assertThat(firstIndexRequest.version(), equalTo(1l));
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue