Engine: Remove Engine.Create

The `_create` API is handy way to specify an index operation should only be done if the document doesn't exist. This is currently implemented in explicit code paths all the way down to the engine. However, conceptually this is no different than any other versioned operation - instead of requiring a document is on a specific version, we require it to be deleted (or non-existent). This PR removes Engine.Create in favor of a slight extension in the VersionType logic.

There are however a couple of side effects:
- DocumentAlreadyExistsException is removed and VersionConflictException is used instead (with an improved error message)
- Update will reject version parameters if the upsert option is used (it doesn't compute anyway).
- Translog.Create is also removed infavor of Translog.Index (that's OK because their binary format was the same, so we can just read Translog.Index of the translog file)

Closes #13955
This commit is contained in:
Boaz Leskes 2015-10-02 14:08:10 +02:00
parent 8a590d46b8
commit bcb3fab6ac
41 changed files with 772 additions and 1511 deletions

View File

@ -482,7 +482,7 @@ public class ElasticsearchException extends RuntimeException implements ToXConte
RESOURCE_NOT_FOUND_EXCEPTION(org.elasticsearch.ResourceNotFoundException.class, org.elasticsearch.ResourceNotFoundException::new, 19),
ACTION_TRANSPORT_EXCEPTION(org.elasticsearch.transport.ActionTransportException.class, org.elasticsearch.transport.ActionTransportException::new, 20),
ELASTICSEARCH_GENERATION_EXCEPTION(org.elasticsearch.ElasticsearchGenerationException.class, org.elasticsearch.ElasticsearchGenerationException::new, 21),
CREATE_FAILED_ENGINE_EXCEPTION(org.elasticsearch.index.engine.CreateFailedEngineException.class, org.elasticsearch.index.engine.CreateFailedEngineException::new, 22),
// 22 was CreateFailedEngineException
INDEX_SHARD_STARTED_EXCEPTION(org.elasticsearch.index.shard.IndexShardStartedException.class, org.elasticsearch.index.shard.IndexShardStartedException::new, 23),
SEARCH_CONTEXT_MISSING_EXCEPTION(org.elasticsearch.search.SearchContextMissingException.class, org.elasticsearch.search.SearchContextMissingException::new, 24),
SCRIPT_EXCEPTION(org.elasticsearch.script.ScriptException.class, org.elasticsearch.script.ScriptException::new, 25),
@ -514,7 +514,7 @@ public class ElasticsearchException extends RuntimeException implements ToXConte
INDEX_SHARD_ALREADY_EXISTS_EXCEPTION(org.elasticsearch.index.IndexShardAlreadyExistsException.class, org.elasticsearch.index.IndexShardAlreadyExistsException::new, 51),
VERSION_CONFLICT_ENGINE_EXCEPTION(org.elasticsearch.index.engine.VersionConflictEngineException.class, org.elasticsearch.index.engine.VersionConflictEngineException::new, 52),
ENGINE_EXCEPTION(org.elasticsearch.index.engine.EngineException.class, org.elasticsearch.index.engine.EngineException::new, 53),
DOCUMENT_ALREADY_EXISTS_EXCEPTION(org.elasticsearch.index.engine.DocumentAlreadyExistsException.class, org.elasticsearch.index.engine.DocumentAlreadyExistsException::new, 54),
// 54 was DocumentAlreadyExistsException, which is superseded by VersionConflictEngineException
NO_SUCH_NODE_EXCEPTION(org.elasticsearch.action.NoSuchNodeException.class, org.elasticsearch.action.NoSuchNodeException::new, 55),
SETTINGS_EXCEPTION(org.elasticsearch.common.settings.SettingsException.class, org.elasticsearch.common.settings.SettingsException::new, 56),
INDEX_TEMPLATE_MISSING_EXCEPTION(org.elasticsearch.indices.IndexTemplateMissingException.class, org.elasticsearch.indices.IndexTemplateMissingException::new, 57),

View File

@ -47,7 +47,6 @@ import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.DocumentAlreadyExistsException;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.mapper.Mapping;
@ -97,6 +96,7 @@ public class TransportShardBulkAction extends TransportReplicationAction<BulkSha
protected TransportRequestOptions transportOptions() {
return BulkAction.INSTANCE.transportOptions(settings);
}
@Override
protected BulkShardResponse newResponseInstance() {
return new BulkShardResponse();
@ -416,7 +416,7 @@ public class TransportShardBulkAction extends TransportReplicationAction<BulkSha
} catch (Throwable t) {
t = ExceptionsHelper.unwrapCause(t);
boolean retry = false;
if (t instanceof VersionConflictEngineException || (t instanceof DocumentAlreadyExistsException && translate.operation() == UpdateHelper.Operation.UPSERT)) {
if (t instanceof VersionConflictEngineException) {
retry = true;
}
return new UpdateResult(translate, indexRequest, retry, t, null);
@ -460,20 +460,12 @@ public class TransportShardBulkAction extends TransportReplicationAction<BulkSha
SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.REPLICA, indexRequest.source()).index(shardId.getIndex()).type(indexRequest.type()).id(indexRequest.id())
.routing(indexRequest.routing()).parent(indexRequest.parent()).timestamp(indexRequest.timestamp()).ttl(indexRequest.ttl());
final Engine.IndexingOperation operation;
if (indexRequest.opType() == IndexRequest.OpType.INDEX) {
operation = indexShard.prepareIndex(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.REPLICA);
} else {
assert indexRequest.opType() == IndexRequest.OpType.CREATE : indexRequest.opType();
operation = indexShard.prepareCreate(sourceToParse,
indexRequest.version(), indexRequest.versionType(),
Engine.Operation.Origin.REPLICA);
}
final Engine.Index operation = indexShard.prepareIndex(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.REPLICA);
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
throw new RetryOnReplicaException(shardId, "Mappings are not available on the replica yet, triggered update: " + update);
}
operation.execute(indexShard);
indexShard.index(operation);
location = locationToSync(location, operation.getTranslogLocation());
} catch (Throwable e) {
// if its not an ignore replica failure, we need to make sure to bubble up the failure

View File

@ -49,14 +49,14 @@ import static org.elasticsearch.action.ValidateActions.addValidationError;
/**
* Index request to index a typed JSON document into a specific index and make it searchable. Best
* created using {@link org.elasticsearch.client.Requests#indexRequest(String)}.
* <p>
*
* The index requires the {@link #index()}, {@link #type(String)}, {@link #id(String)} and
* {@link #source(byte[])} to be set.
* <p>
*
* The source (content to index) can be set in its bytes form using ({@link #source(byte[])}),
* its string form ({@link #source(String)}) or using a {@link org.elasticsearch.common.xcontent.XContentBuilder}
* ({@link #source(org.elasticsearch.common.xcontent.XContentBuilder)}).
* <p>
*
* If the {@link #id(String)} is not set, it will be automatically generated.
*
* @see IndexResponse
@ -114,7 +114,7 @@ public class IndexRequest extends ReplicationRequest<IndexRequest> implements Do
public static OpType fromString(String sOpType) {
String lowersOpType = sOpType.toLowerCase(Locale.ROOT);
switch(lowersOpType){
switch (lowersOpType) {
case "create":
return OpType.CREATE;
case "index":
@ -216,6 +216,14 @@ public class IndexRequest extends ReplicationRequest<IndexRequest> implements Do
if (source == null) {
validationException = addValidationError("source is missing", validationException);
}
if (opType() == OpType.CREATE) {
if (versionType != VersionType.INTERNAL || version != Versions.MATCH_DELETED) {
validationException = addValidationError("create operations do not support versioning. use index instead", validationException);
return validationException;
}
}
if (!versionType.validateVersionForWrites(version)) {
validationException = addValidationError("illegal version value [" + version + "] for version type [" + versionType.name() + "]", validationException);
}
@ -370,7 +378,7 @@ public class IndexRequest extends ReplicationRequest<IndexRequest> implements Do
/**
* Sets the document source to index.
* <p>
*
* Note, its preferable to either set it using {@link #source(org.elasticsearch.common.xcontent.XContentBuilder)}
* or using the {@link #source(byte[])}.
*/
@ -480,6 +488,10 @@ public class IndexRequest extends ReplicationRequest<IndexRequest> implements Do
*/
public IndexRequest opType(OpType opType) {
this.opType = opType;
if (opType == OpType.CREATE) {
version(Versions.MATCH_DELETED);
versionType(VersionType.INTERNAL);
}
return this;
}

View File

@ -54,7 +54,7 @@ import org.elasticsearch.transport.TransportService;
/**
* Performs the index operation.
* <p>
*
* Allows for the following settings:
* <ul>
* <li><b>autoCreateIndex</b>: When set to <tt>true</tt>, will automatically create an index if one does not exists.
@ -167,6 +167,7 @@ public class TransportIndexAction extends TransportReplicationAction<IndexReques
IndexShard indexShard = indexService.getShard(shardRequest.shardId.id());
final WriteResult<IndexResponse> result = executeIndexRequestOnPrimary(null, request, indexShard);
final IndexResponse response = result.response;
final Translog.Location location = result.location;
processAfter(request.refresh(), indexShard, location);
@ -180,18 +181,12 @@ public class TransportIndexAction extends TransportReplicationAction<IndexReques
SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.REPLICA, request.source()).index(shardId.getIndex()).type(request.type()).id(request.id())
.routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());
final Engine.IndexingOperation operation;
if (request.opType() == IndexRequest.OpType.INDEX) {
operation = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.REPLICA);
} else {
assert request.opType() == IndexRequest.OpType.CREATE : request.opType();
operation = indexShard.prepareCreate(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.REPLICA);
}
final Engine.Index operation = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.REPLICA);
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
throw new RetryOnReplicaException(shardId, "Mappings are not available on the replica yet, triggered update: " + update);
}
operation.execute(indexShard);
indexShard.index(operation);
processAfter(request.refresh(), indexShard, operation.getTranslogLocation());
}

View File

@ -56,7 +56,6 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.DocumentAlreadyExistsException;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.mapper.Mapping;
@ -188,9 +187,6 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
if (cause instanceof VersionConflictEngineException) {
return true;
}
if (cause instanceof DocumentAlreadyExistsException) {
return true;
}
return false;
}
@ -1036,22 +1032,17 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
/** Utility method to create either an index or a create operation depending
* on the {@link OpType} of the request. */
private final Engine.IndexingOperation prepareIndexOperationOnPrimary(BulkShardRequest shardRequest, IndexRequest request, IndexShard indexShard) {
private final Engine.Index prepareIndexOperationOnPrimary(BulkShardRequest shardRequest, IndexRequest request, IndexShard indexShard) {
SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.PRIMARY, request.source()).index(request.index()).type(request.type()).id(request.id())
.routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());
if (request.opType() == IndexRequest.OpType.INDEX) {
return indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY);
} else {
assert request.opType() == IndexRequest.OpType.CREATE : request.opType();
return indexShard.prepareCreate(sourceToParse,
request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY);
}
}
/** Execute the given {@link IndexRequest} on a primary shard, throwing a
* {@link RetryOnPrimaryException} if the operation needs to be re-tried. */
protected final WriteResult<IndexResponse> executeIndexRequestOnPrimary(BulkShardRequest shardRequest, IndexRequest request, IndexShard indexShard) throws Throwable {
Engine.IndexingOperation operation = prepareIndexOperationOnPrimary(shardRequest, request, indexShard);
Engine.Index operation = prepareIndexOperationOnPrimary(shardRequest, request, indexShard);
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
final ShardId shardId = indexShard.shardId();
if (update != null) {
@ -1064,7 +1055,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
"Dynamics mappings are not available on the node that holds the primary yet");
}
}
final boolean created = operation.execute(indexShard);
final boolean created = indexShard.index(operation);
// update the version on request so it will happen on the replicas
final long version = operation.version();

View File

@ -48,9 +48,8 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.engine.DocumentAlreadyExistsException;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.indices.IndicesService;
@ -170,7 +169,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
final UpdateHelper.Result result = updateHelper.prepare(request, indexShard);
switch (result.operation()) {
case UPSERT:
IndexRequest upsertRequest = new IndexRequest((IndexRequest)result.action(), request);
IndexRequest upsertRequest = new IndexRequest(result.action(), request);
// we fetch it from the index request so we don't generate the bytes twice, its already done in the index request
final BytesReference upsertSourceBytes = upsertRequest.source();
indexAction.execute(upsertRequest, new ActionListener<IndexResponse>() {
@ -189,7 +188,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
@Override
public void onFailure(Throwable e) {
e = ExceptionsHelper.unwrapCause(e);
if (e instanceof VersionConflictEngineException || e instanceof DocumentAlreadyExistsException) {
if (e instanceof VersionConflictEngineException) {
if (retryCount < request.retryOnConflict()) {
threadPool.executor(executor()).execute(new ActionRunnable<UpdateResponse>(listener) {
@Override
@ -205,7 +204,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
});
break;
case INDEX:
IndexRequest indexRequest = new IndexRequest((IndexRequest)result.action(), request);
IndexRequest indexRequest = new IndexRequest(result.action(), request);
// we fetch it from the index request so we don't generate the bytes twice, its already done in the index request
final BytesReference indexSourceBytes = indexRequest.source();
indexAction.execute(indexRequest, new ActionListener<IndexResponse>() {
@ -235,7 +234,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
});
break;
case DELETE:
DeleteRequest deleteRequest = new DeleteRequest((DeleteRequest)result.action(), request);
DeleteRequest deleteRequest = new DeleteRequest(result.action(), request);
deleteAction.execute(deleteRequest, new ActionListener<DeleteResponse>() {
@Override
public void onResponse(DeleteResponse response) {

View File

@ -33,10 +33,24 @@ import java.util.concurrent.ConcurrentMap;
/** Utility class to resolve the Lucene doc ID and version for a given uid. */
public class Versions {
public static final long MATCH_ANY = -3L; // Version was not specified by the user
/** used to indicate the write operation should succeed regardless of current version **/
public static final long MATCH_ANY = -3L;
/** indicates that the current document was not found in lucene and in the version map */
public static final long NOT_FOUND = -1L;
/**
* used when the document is old and doesn't contain any version information in the index
* see {@link PerThreadIDAndVersionLookup#lookup(org.apache.lucene.util.BytesRef)}
*/
public static final long NOT_SET = -2L;
/**
* used to indicate that the write operation should be executed if the document is currently deleted
* i.e., not found in the index and/or found as deleted (with version) in the version map
*/
public static final long MATCH_DELETED = -4L;
// TODO: is there somewhere else we can store these?
private static final ConcurrentMap<IndexReader, CloseableThreadLocal<PerThreadIDAndVersionLookup>> lookupStates = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();

View File

@ -31,24 +31,37 @@ import java.io.IOException;
public enum VersionType implements Writeable<VersionType> {
INTERNAL((byte) 0) {
@Override
public boolean isVersionConflictForWrites(long currentVersion, long expectedVersion) {
return isVersionConflict(currentVersion, expectedVersion);
public boolean isVersionConflictForWrites(long currentVersion, long expectedVersion, boolean deleted) {
return isVersionConflict(currentVersion, expectedVersion, deleted);
}
@Override
public String explainConflictForWrites(long currentVersion, long expectedVersion, boolean deleted) {
if (expectedVersion == Versions.MATCH_DELETED) {
return "document already exists (current version [" + currentVersion + "])";
}
return "current version [" + currentVersion + "] is different than the one provided [" + expectedVersion + "]";
}
@Override
public boolean isVersionConflictForReads(long currentVersion, long expectedVersion) {
return isVersionConflict(currentVersion, expectedVersion);
return isVersionConflict(currentVersion, expectedVersion, false);
}
private boolean isVersionConflict(long currentVersion, long expectedVersion) {
@Override
public String explainConflictForReads(long currentVersion, long expectedVersion) {
return "current version [" + currentVersion + "] is different than the one provided [" + expectedVersion + "]";
}
private boolean isVersionConflict(long currentVersion, long expectedVersion, boolean deleted) {
if (currentVersion == Versions.NOT_SET) {
return false;
}
if (expectedVersion == Versions.MATCH_ANY) {
return false;
}
if (currentVersion == Versions.NOT_FOUND) {
return true;
if (expectedVersion == Versions.MATCH_DELETED) {
return deleted == false;
}
if (currentVersion != expectedVersion) {
return true;
@ -63,8 +76,7 @@ public enum VersionType implements Writeable<VersionType> {
@Override
public boolean validateVersionForWrites(long version) {
// not allowing Versions.NOT_FOUND as it is not a valid input value.
return version > 0L || version == Versions.MATCH_ANY;
return version > 0L || version == Versions.MATCH_ANY || version == Versions.MATCH_DELETED;
}
@Override
@ -82,7 +94,7 @@ public enum VersionType implements Writeable<VersionType> {
},
EXTERNAL((byte) 1) {
@Override
public boolean isVersionConflictForWrites(long currentVersion, long expectedVersion) {
public boolean isVersionConflictForWrites(long currentVersion, long expectedVersion, boolean deleted) {
if (currentVersion == Versions.NOT_SET) {
return false;
}
@ -98,6 +110,11 @@ public enum VersionType implements Writeable<VersionType> {
return false;
}
@Override
public String explainConflictForWrites(long currentVersion, long expectedVersion, boolean deleted) {
return "current version [" + currentVersion + "] is higher or equal to the one provided [" + expectedVersion + "]";
}
@Override
public boolean isVersionConflictForReads(long currentVersion, long expectedVersion) {
if (currentVersion == Versions.NOT_SET) {
@ -115,6 +132,11 @@ public enum VersionType implements Writeable<VersionType> {
return false;
}
@Override
public String explainConflictForReads(long currentVersion, long expectedVersion) {
return "current version [" + currentVersion + "] is different than the one provided [" + expectedVersion + "]";
}
@Override
public long updateVersion(long currentVersion, long expectedVersion) {
return expectedVersion;
@ -133,7 +155,7 @@ public enum VersionType implements Writeable<VersionType> {
},
EXTERNAL_GTE((byte) 2) {
@Override
public boolean isVersionConflictForWrites(long currentVersion, long expectedVersion) {
public boolean isVersionConflictForWrites(long currentVersion, long expectedVersion, boolean deleted) {
if (currentVersion == Versions.NOT_SET) {
return false;
}
@ -149,6 +171,11 @@ public enum VersionType implements Writeable<VersionType> {
return false;
}
@Override
public String explainConflictForWrites(long currentVersion, long expectedVersion, boolean deleted) {
return "current version [" + currentVersion + "] is higher than the one provided [" + expectedVersion + "]";
}
@Override
public boolean isVersionConflictForReads(long currentVersion, long expectedVersion) {
if (currentVersion == Versions.NOT_SET) {
@ -166,6 +193,11 @@ public enum VersionType implements Writeable<VersionType> {
return false;
}
@Override
public String explainConflictForReads(long currentVersion, long expectedVersion) {
return "current version [" + currentVersion + "] is different than the one provided [" + expectedVersion + "]";
}
@Override
public long updateVersion(long currentVersion, long expectedVersion) {
return expectedVersion;
@ -187,7 +219,7 @@ public enum VersionType implements Writeable<VersionType> {
*/
FORCE((byte) 3) {
@Override
public boolean isVersionConflictForWrites(long currentVersion, long expectedVersion) {
public boolean isVersionConflictForWrites(long currentVersion, long expectedVersion, boolean deleted) {
if (currentVersion == Versions.NOT_SET) {
return false;
}
@ -195,16 +227,26 @@ public enum VersionType implements Writeable<VersionType> {
return false;
}
if (expectedVersion == Versions.MATCH_ANY) {
return true;
throw new IllegalStateException("you must specify a version when use VersionType.FORCE");
}
return false;
}
@Override
public String explainConflictForWrites(long currentVersion, long expectedVersion, boolean deleted) {
throw new AssertionError("VersionType.FORCE should never result in a write conflict");
}
@Override
public boolean isVersionConflictForReads(long currentVersion, long expectedVersion) {
return false;
}
@Override
public String explainConflictForReads(long currentVersion, long expectedVersion) {
throw new AssertionError("VersionType.FORCE should never result in a read conflict");
}
@Override
public long updateVersion(long currentVersion, long expectedVersion) {
return expectedVersion;
@ -237,17 +279,46 @@ public enum VersionType implements Writeable<VersionType> {
/**
* Checks whether the current version conflicts with the expected version, based on the current version type.
*
* @param currentVersion the current version for the document
* @param expectedVersion the version specified for the write operation
* @param deleted true if the document is currently deleted (note that #currentVersion will typically be
* {@link Versions#NOT_FOUND}, but may be something else if the document was recently deleted
* @return true if versions conflict false o.w.
*/
public abstract boolean isVersionConflictForWrites(long currentVersion, long expectedVersion);
public abstract boolean isVersionConflictForWrites(long currentVersion, long expectedVersion, boolean deleted);
/**
* Returns a human readable explanation for a version conflict on write.
*
* Note that this method is only called if {@link #isVersionConflictForWrites(long, long, boolean)} returns true;
*
* @param currentVersion the current version for the document
* @param expectedVersion the version specified for the write operation
* @param deleted true if the document is currently deleted (note that #currentVersion will typically be
* {@link Versions#NOT_FOUND}, but may be something else if the document was recently deleted
*/
public abstract String explainConflictForWrites(long currentVersion, long expectedVersion, boolean deleted);
/**
* Checks whether the current version conflicts with the expected version, based on the current version type.
*
* @param currentVersion the current version for the document
* @param expectedVersion the version specified for the read operation
* @return true if versions conflict false o.w.
*/
public abstract boolean isVersionConflictForReads(long currentVersion, long expectedVersion);
/**
* Returns a human readable explanation for a version conflict on read.
*
* Note that this method is only called if {@link #isVersionConflictForReads(long, long)} returns true;
*
* @param currentVersion the current version for the document
* @param expectedVersion the version specified for the read operation
*/
public abstract String explainConflictForReads(long currentVersion, long expectedVersion);
/**
* Returns the new version for a document, based on its current one and the specified in the request
*

View File

@ -1,66 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.engine;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.shard.ShardId;
import java.io.IOException;
import java.util.Objects;
/**
*
*/
public class CreateFailedEngineException extends EngineException {
private final String type;
private final String id;
public CreateFailedEngineException(ShardId shardId, String type, String id, Throwable cause) {
super(shardId, "Create failed for [" + type + "#" + id + "]", cause);
Objects.requireNonNull(type, "type must not be null");
Objects.requireNonNull(id, "id must not be null");
this.type = type;
this.id = id;
}
public CreateFailedEngineException(StreamInput in) throws IOException{
super(in);
type = in.readString();
id = in.readString();
}
public String type() {
return this.type;
}
public String id() {
return this.id;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(type);
out.writeString(id);
}
}

View File

@ -1,44 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.engine;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.rest.RestStatus;
import java.io.IOException;
/**
*
*/
public class DocumentAlreadyExistsException extends EngineException {
public DocumentAlreadyExistsException(ShardId shardId, String type, String id) {
super(shardId, "[" + type + "][" + id + "]: document already exists");
}
public DocumentAlreadyExistsException(StreamInput in) throws IOException{
super(in);
}
@Override
public RestStatus status() {
return RestStatus.CONFLICT;
}
}

View File

@ -45,7 +45,6 @@ import org.elasticsearch.index.mapper.ParseContext.Document;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
@ -60,7 +59,6 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.function.Supplier;
/**
*
@ -144,7 +142,8 @@ public abstract class Engine implements Closeable {
return new MergeStats();
}
/** A throttling class that can be activated, causing the
/**
* A throttling class that can be activated, causing the
* {@code acquireThrottle} method to block on a lock when throttling
* is enabled
*/
@ -203,9 +202,7 @@ public abstract class Engine implements Closeable {
}
}
public abstract void create(Create create) throws EngineException;
public abstract boolean index(Index index) throws EngineException;
public abstract boolean index(Index operation) throws EngineException;
public abstract void delete(Delete delete) throws EngineException;
@ -216,6 +213,7 @@ public abstract class Engine implements Closeable {
/**
* Attempts to do a special commit where the given syncID is put into the commit data. The attempt
* succeeds if there are not pending writes in lucene and the current point is equal to the expected one.
*
* @param syncId id of this sync
* @param expectedCommitId the expected value of
* @return true if the sync commit was made, false o.w.
@ -243,7 +241,8 @@ public abstract class Engine implements Closeable {
if (get.versionType().isVersionConflictForReads(docIdAndVersion.version, get.version())) {
Releasables.close(searcher);
Uid uid = Uid.createUid(get.uid().text());
throw new VersionConflictEngineException(shardId, uid.type(), uid.id(), docIdAndVersion.version, get.version());
throw new VersionConflictEngineException(shardId, uid.type(), uid.id(),
get.versionType().explainConflictForReads(docIdAndVersion.version, get.version()));
}
}
@ -328,7 +327,7 @@ public abstract class Engine implements Closeable {
} catch (IOException e) {
// Fall back to reading from the store if reading from the commit fails
try {
return store. readLastCommittedSegmentsInfo();
return store.readLastCommittedSegmentsInfo();
} catch (IOException e2) {
e2.addSuppressed(e);
throw e2;
@ -469,6 +468,7 @@ public abstract class Engine implements Closeable {
/**
* Flushes the state of the engine including the transaction log, clearing memory.
*
* @param force if <code>true</code> a lucene commit is executed even if no changes need to be committed.
* @param waitIfOngoing if <code>true</code> this call will block until all currently running flushes have finished.
* Otherwise this call will return without blocking.
@ -607,62 +607,97 @@ public abstract class Engine implements Closeable {
}
}
public static interface Operation {
static enum Type {
CREATE,
INDEX,
DELETE
}
static enum Origin {
PRIMARY,
REPLICA,
RECOVERY
}
Type opType();
Origin origin();
}
public static abstract class IndexingOperation implements Operation {
public static abstract class Operation {
private final Term uid;
private final ParsedDocument doc;
private long version;
private final VersionType versionType;
private final Origin origin;
private Translog.Location location;
private final long startTime;
private long endTime;
public IndexingOperation(Term uid, ParsedDocument doc, long version, VersionType versionType, Origin origin, long startTime) {
public Operation(Term uid, long version, VersionType versionType, Origin origin, long startTime) {
this.uid = uid;
this.doc = doc;
this.version = version;
this.versionType = versionType;
this.origin = origin;
this.startTime = startTime;
}
public IndexingOperation(Term uid, ParsedDocument doc) {
this(uid, doc, Versions.MATCH_ANY, VersionType.INTERNAL, Origin.PRIMARY, System.nanoTime());
public static enum Origin {
PRIMARY,
REPLICA,
RECOVERY
}
@Override
public Origin origin() {
return this.origin;
}
public ParsedDocument parsedDoc() {
return this.doc;
}
public Term uid() {
return this.uid;
}
public long version() {
return this.version;
}
public void updateVersion(long version) {
this.version = version;
}
public void setTranslogLocation(Translog.Location location) {
this.location = location;
}
public Translog.Location getTranslogLocation() {
return this.location;
}
public VersionType versionType() {
return this.versionType;
}
/**
* Returns operation start time in nanoseconds.
*/
public long startTime() {
return this.startTime;
}
public void endTime(long endTime) {
this.endTime = endTime;
}
/**
* Returns operation end time in nanoseconds.
*/
public long endTime() {
return this.endTime;
}
}
public static class Index extends Operation {
private final ParsedDocument doc;
public Index(Term uid, ParsedDocument doc, long version, VersionType versionType, Origin origin, long startTime) {
super(uid, version, versionType, origin, startTime);
this.doc = doc;
}
public Index(Term uid, ParsedDocument doc) {
this(uid, doc, Versions.MATCH_ANY);
}
public Index(Term uid, ParsedDocument doc, long version) {
this(uid, doc, version, VersionType.INTERNAL, Origin.PRIMARY, System.nanoTime());
}
public ParsedDocument parsedDoc() {
return this.doc;
}
public String type() {
return this.doc.type();
}
@ -683,27 +718,12 @@ public abstract class Engine implements Closeable {
return this.doc.ttl();
}
public long version() {
return this.version;
}
@Override
public void updateVersion(long version) {
this.version = version;
super.updateVersion(version);
this.doc.version().setLongValue(version);
}
public void setTranslogLocation(Translog.Location location) {
this.location = location;
}
public Translog.Location getTranslogLocation() {
return this.location;
}
public VersionType versionType() {
return this.versionType;
}
public String parent() {
return this.doc.parent();
}
@ -715,96 +735,17 @@ public abstract class Engine implements Closeable {
public BytesReference source() {
return this.doc.source();
}
/**
* Returns operation start time in nanoseconds.
*/
public long startTime() {
return this.startTime;
}
public void endTime(long endTime) {
this.endTime = endTime;
}
/**
* Returns operation end time in nanoseconds.
*/
public long endTime() {
return this.endTime;
}
/**
* Execute this operation against the provided {@link IndexShard} and
* return whether the document was created.
*/
public abstract boolean execute(IndexShard shard);
}
public static final class Create extends IndexingOperation {
public Create(Term uid, ParsedDocument doc, long version, VersionType versionType, Origin origin, long startTime) {
super(uid, doc, version, versionType, origin, startTime);
}
public Create(Term uid, ParsedDocument doc) {
super(uid, doc);
}
@Override
public Type opType() {
return Type.CREATE;
}
@Override
public boolean execute(IndexShard shard) {
shard.create(this);
return true;
}
}
public static final class Index extends IndexingOperation {
public Index(Term uid, ParsedDocument doc, long version, VersionType versionType, Origin origin, long startTime) {
super(uid, doc, version, versionType, origin, startTime);
}
public Index(Term uid, ParsedDocument doc) {
super(uid, doc);
}
@Override
public Type opType() {
return Type.INDEX;
}
@Override
public boolean execute(IndexShard shard) {
return shard.index(this);
}
}
public static class Delete implements Operation {
public static class Delete extends Operation {
private final String type;
private final String id;
private final Term uid;
private long version;
private final VersionType versionType;
private final Origin origin;
private boolean found;
private final long startTime;
private long endTime;
private Translog.Location location;
public Delete(String type, String id, Term uid, long version, VersionType versionType, Origin origin, long startTime, boolean found) {
super(uid, version, versionType, origin, startTime);
this.type = type;
this.id = id;
this.uid = uid;
this.version = version;
this.versionType = versionType;
this.origin = origin;
this.startTime = startTime;
this.found = found;
}
@ -816,16 +757,6 @@ public abstract class Engine implements Closeable {
this(template.type(), template.id(), template.uid(), template.version(), versionType, template.origin(), template.startTime(), template.found());
}
@Override
public Type opType() {
return Type.DELETE;
}
@Override
public Origin origin() {
return this.origin;
}
public String type() {
return this.type;
}
@ -834,55 +765,14 @@ public abstract class Engine implements Closeable {
return this.id;
}
public Term uid() {
return this.uid;
}
public void updateVersion(long version, boolean found) {
this.version = version;
updateVersion(version);
this.found = found;
}
/**
* before delete execution this is the version to be deleted. After this is the version of the "delete" transaction record.
*/
public long version() {
return this.version;
}
public VersionType versionType() {
return this.versionType;
}
public boolean found() {
return this.found;
}
/**
* Returns operation start time in nanoseconds.
*/
public long startTime() {
return this.startTime;
}
public void endTime(long endTime) {
this.endTime = endTime;
}
/**
* Returns operation end time in nanoseconds.
*/
public long endTime() {
return this.endTime;
}
public void setTranslogLocation(Translog.Location location) {
this.location = location;
}
public Translog.Location getTranslogLocation() {
return this.location;
}
}
public static class DeleteByQuery {
@ -1135,12 +1025,18 @@ public abstract class Engine implements Closeable {
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
CommitId commitId = (CommitId) o;
if (!Arrays.equals(id, commitId.id)) return false;
if (!Arrays.equals(id, commitId.id)) {
return false;
}
return true;
}
@ -1151,5 +1047,6 @@ public abstract class Engine implements Closeable {
}
}
public void onSettingsChanged() {}
public void onSettingsChanged() {
}
}

View File

@ -258,10 +258,10 @@ public final class EngineConfig {
/**
* Returns a {@link org.elasticsearch.index.indexing.ShardIndexingService} used inside the engine to inform about
* pre and post index and create operations. The operations are used for statistic purposes etc.
* pre and post index. The operations are used for statistic purposes etc.
*
* @see org.elasticsearch.index.indexing.ShardIndexingService#postCreate(org.elasticsearch.index.engine.Engine.Create)
* @see org.elasticsearch.index.indexing.ShardIndexingService#preCreate(org.elasticsearch.index.engine.Engine.Create)
* @see org.elasticsearch.index.indexing.ShardIndexingService#postIndex(Engine.Index)
* @see org.elasticsearch.index.indexing.ShardIndexingService#preIndex(Engine.Index)
*
*/
public ShardIndexingService getIndexingService() {

View File

@ -30,16 +30,16 @@ import java.io.IOException;
*/
public class EngineException extends ElasticsearchException {
public EngineException(ShardId shardId, String msg) {
this(shardId, msg, null);
public EngineException(ShardId shardId, String msg, Object... params) {
this(shardId, msg, null, params);
}
public EngineException(ShardId shardId, String msg, Throwable cause) {
super(msg, cause);
public EngineException(ShardId shardId, String msg, Throwable cause, Object... params) {
super(msg, cause, params);
setShard(shardId);
}
public EngineException(StreamInput in) throws IOException{
public EngineException(StreamInput in) throws IOException {
super(in);
}
}

View File

@ -316,7 +316,8 @@ public class InternalEngine extends Engine {
}
if (get.versionType().isVersionConflictForReads(versionValue.version(), get.version())) {
Uid uid = Uid.createUid(get.uid().text());
throw new VersionConflictEngineException(shardId, uid.type(), uid.id(), versionValue.version(), get.version());
throw new VersionConflictEngineException(shardId, uid.type(), uid.id(),
get.versionType().explainConflictForReads(versionValue.version(), get.version()));
}
Translog.Operation op = translog.read(versionValue.translogLocation());
if (op != null) {
@ -331,96 +332,7 @@ public class InternalEngine extends Engine {
}
@Override
public void create(Create create) throws EngineException {
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
if (create.origin() == Operation.Origin.RECOVERY) {
// Don't throttle recovery operations
innerCreate(create);
} else {
try (Releasable r = throttle.acquireThrottle()) {
innerCreate(create);
}
}
} catch (OutOfMemoryError | IllegalStateException | IOException t) {
maybeFailEngine("create", t);
throw new CreateFailedEngineException(shardId, create.type(), create.id(), t);
}
checkVersionMapRefresh();
}
private void innerCreate(Create create) throws IOException {
synchronized (dirtyLock(create.uid())) {
final long currentVersion;
final VersionValue versionValue;
versionValue = versionMap.getUnderLock(create.uid().bytes());
if (versionValue == null) {
currentVersion = loadCurrentVersionFromIndex(create.uid());
} else {
if (engineConfig.isEnableGcDeletes() && versionValue.delete() && (engineConfig.getThreadPool().estimatedTimeInMillis() - versionValue.time()) > engineConfig.getGcDeletesInMillis()) {
currentVersion = Versions.NOT_FOUND; // deleted, and GC
} else {
currentVersion = versionValue.version();
}
}
innerCreateUnderLock(create, currentVersion, versionValue);
}
}
private void innerCreateUnderLock(Create create, long currentVersion, VersionValue versionValue) throws IOException {
// same logic as index
long updatedVersion;
long expectedVersion = create.version();
if (create.versionType().isVersionConflictForWrites(currentVersion, expectedVersion)) {
if (create.origin() == Operation.Origin.RECOVERY) {
return;
} else {
throw new VersionConflictEngineException(shardId, create.type(), create.id(), currentVersion, expectedVersion);
}
}
updatedVersion = create.versionType().updateVersion(currentVersion, expectedVersion);
// if the doc exists
boolean doUpdate = false;
if ((versionValue != null && versionValue.delete() == false) || (versionValue == null && currentVersion != Versions.NOT_FOUND)) {
if (create.origin() == Operation.Origin.RECOVERY) {
return;
} else if (create.origin() == Operation.Origin.REPLICA) {
// #7142: the primary already determined it's OK to index this document, and we confirmed above that the version doesn't
// conflict, so we must also update here on the replica to remain consistent:
doUpdate = true;
} else {
// On primary, we throw DAEE if the _uid is already in the index with an older version:
assert create.origin() == Operation.Origin.PRIMARY;
throw new DocumentAlreadyExistsException(shardId, create.type(), create.id());
}
}
create.updateVersion(updatedVersion);
if (doUpdate) {
if (create.docs().size() > 1) {
indexWriter.updateDocuments(create.uid(), create.docs());
} else {
indexWriter.updateDocument(create.uid(), create.docs().get(0));
}
} else {
if (create.docs().size() > 1) {
indexWriter.addDocuments(create.docs());
} else {
indexWriter.addDocument(create.docs().get(0));
}
}
Translog.Location translogLocation = translog.add(new Translog.Create(create));
versionMap.putUnderLock(create.uid().bytes(), new VersionValue(updatedVersion, translogLocation));
create.setTranslogLocation(translogLocation);
indexingService.postCreateUnderLock(create);
}
@Override
public boolean index(Index index) throws EngineException {
public boolean index(Index index) {
final boolean created;
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
@ -440,6 +352,67 @@ public class InternalEngine extends Engine {
return created;
}
private boolean innerIndex(Index index) throws IOException {
synchronized (dirtyLock(index.uid())) {
final long currentVersion;
final boolean deleted;
VersionValue versionValue = versionMap.getUnderLock(index.uid().bytes());
if (versionValue == null) {
currentVersion = loadCurrentVersionFromIndex(index.uid());
deleted = currentVersion == Versions.NOT_FOUND;
} else {
deleted = versionValue.delete();
if (engineConfig.isEnableGcDeletes() && versionValue.delete() && (engineConfig.getThreadPool().estimatedTimeInMillis() - versionValue.time()) > engineConfig.getGcDeletesInMillis()) {
currentVersion = Versions.NOT_FOUND; // deleted, and GC
} else {
currentVersion = versionValue.version();
}
}
long expectedVersion = index.version();
if (index.versionType().isVersionConflictForWrites(currentVersion, expectedVersion, deleted)) {
if (index.origin() == Operation.Origin.RECOVERY) {
return false;
} else {
throw new VersionConflictEngineException(shardId, index.type(), index.id(),
index.versionType().explainConflictForWrites(currentVersion, expectedVersion, deleted));
}
}
long updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion);
final boolean created;
index.updateVersion(updatedVersion);
if (currentVersion == Versions.NOT_FOUND) {
// document does not exists, we can optimize for create
created = true;
if (index.docs().size() > 1) {
indexWriter.addDocuments(index.docs());
} else {
indexWriter.addDocument(index.docs().get(0));
}
} else {
if (versionValue != null) {
created = versionValue.delete(); // we have a delete which is not GC'ed...
} else {
created = false;
}
if (index.docs().size() > 1) {
indexWriter.updateDocuments(index.uid(), index.docs());
} else {
indexWriter.updateDocument(index.uid(), index.docs().get(0));
}
}
Translog.Location translogLocation = translog.add(new Translog.Index(index));
versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion, translogLocation));
index.setTranslogLocation(translogLocation);
indexingService.postIndexUnderLock(index);
return created;
}
}
/**
* Forces a refresh if the versionMap is using too much RAM
*/
@ -467,62 +440,6 @@ public class InternalEngine extends Engine {
}
}
private boolean innerIndex(Index index) throws IOException {
synchronized (dirtyLock(index.uid())) {
final long currentVersion;
VersionValue versionValue = versionMap.getUnderLock(index.uid().bytes());
if (versionValue == null) {
currentVersion = loadCurrentVersionFromIndex(index.uid());
} else {
if (engineConfig.isEnableGcDeletes() && versionValue.delete() && (engineConfig.getThreadPool().estimatedTimeInMillis() - versionValue.time()) > engineConfig.getGcDeletesInMillis()) {
currentVersion = Versions.NOT_FOUND; // deleted, and GC
} else {
currentVersion = versionValue.version();
}
}
long updatedVersion;
long expectedVersion = index.version();
if (index.versionType().isVersionConflictForWrites(currentVersion, expectedVersion)) {
if (index.origin() == Operation.Origin.RECOVERY) {
return false;
} else {
throw new VersionConflictEngineException(shardId, index.type(), index.id(), currentVersion, expectedVersion);
}
}
updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion);
final boolean created;
index.updateVersion(updatedVersion);
if (currentVersion == Versions.NOT_FOUND) {
// document does not exists, we can optimize for create
created = true;
if (index.docs().size() > 1) {
indexWriter.addDocuments(index.docs());
} else {
indexWriter.addDocument(index.docs().get(0));
}
} else {
if (versionValue != null) {
created = versionValue.delete(); // we have a delete which is not GC'ed...
} else {
created = false;
}
if (index.docs().size() > 1) {
indexWriter.updateDocuments(index.uid(), index.docs());
} else {
indexWriter.updateDocument(index.uid(), index.docs().get(0));
}
}
Translog.Location translogLocation = translog.add(new Translog.Index(index));
versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion, translogLocation));
index.setTranslogLocation(translogLocation);
indexingService.postIndexUnderLock(index);
return created;
}
}
@Override
public void delete(Delete delete) throws EngineException {
try (ReleasableLock lock = readLock.acquire()) {
@ -549,10 +466,13 @@ public class InternalEngine extends Engine {
private void innerDelete(Delete delete) throws IOException {
synchronized (dirtyLock(delete.uid())) {
final long currentVersion;
final boolean deleted;
VersionValue versionValue = versionMap.getUnderLock(delete.uid().bytes());
if (versionValue == null) {
currentVersion = loadCurrentVersionFromIndex(delete.uid());
deleted = currentVersion == Versions.NOT_FOUND;
} else {
deleted = versionValue.delete();
if (engineConfig.isEnableGcDeletes() && versionValue.delete() && (engineConfig.getThreadPool().estimatedTimeInMillis() - versionValue.time()) > engineConfig.getGcDeletesInMillis()) {
currentVersion = Versions.NOT_FOUND; // deleted, and GC
} else {
@ -562,11 +482,12 @@ public class InternalEngine extends Engine {
long updatedVersion;
long expectedVersion = delete.version();
if (delete.versionType().isVersionConflictForWrites(currentVersion, expectedVersion)) {
if (delete.versionType().isVersionConflictForWrites(currentVersion, expectedVersion, deleted)) {
if (delete.origin() == Operation.Origin.RECOVERY) {
return;
} else {
throw new VersionConflictEngineException(shardId, delete.type(), delete.id(), currentVersion, expectedVersion);
throw new VersionConflictEngineException(shardId, delete.type(), delete.id(),
delete.versionType().explainConflictForWrites(currentVersion, expectedVersion, deleted));
}
}
updatedVersion = delete.versionType().updateVersion(currentVersion, expectedVersion);

View File

@ -102,11 +102,6 @@ public class ShadowEngine extends Engine {
}
@Override
public void create(Create create) throws EngineException {
throw new UnsupportedOperationException(shardId + " create operation not allowed on shadow engine");
}
@Override
public boolean index(Index index) throws EngineException {
throw new UnsupportedOperationException(shardId + " index operation not allowed on shadow engine");

View File

@ -29,8 +29,16 @@ import java.io.IOException;
*/
public class VersionConflictEngineException extends EngineException {
public VersionConflictEngineException(ShardId shardId, String type, String id, long current, long provided) {
super(shardId, "[" + type + "][" + id + "]: version conflict, current [" + current + "], provided [" + provided + "]");
public VersionConflictEngineException(ShardId shardId, String type, String id, String explanation) {
this(shardId, null, type, id, explanation);
}
public VersionConflictEngineException(ShardId shardId, Throwable cause, String type, String id, String explanation) {
this(shardId, "[{}][{}]: version conflict, {}", cause, type, id, explanation);
}
public VersionConflictEngineException(ShardId shardId, String msg, Throwable cause, Object... params) {
super(shardId, msg, cause, params);
}
@Override

View File

@ -28,39 +28,8 @@ public abstract class IndexingOperationListener {
/**
* Called before the indexing occurs.
*/
public Engine.Create preCreate(Engine.Create create) {
return create;
}
/**
* Called after the indexing occurs, under a locking scheme to maintain
* concurrent updates to the same doc.
* <p>
* Note, long operations should not occur under this callback.
*/
public void postCreateUnderLock(Engine.Create create) {
}
/**
* Called after create index operation occurred.
*/
public void postCreate(Engine.Create create) {
}
/**
* Called after create index operation occurred with exception.
*/
public void postCreate(Engine.Create create, Throwable ex) {
}
/**
* Called before the indexing occurs.
*/
public Engine.Index preIndex(Engine.Index index) {
return index;
public Engine.Index preIndex(Engine.Index operation) {
return operation;
}
/**

View File

@ -128,10 +128,6 @@ public final class IndexingSlowLog {
postIndexing(index.parsedDoc(), tookInNanos);
}
void postCreate(Engine.Create create, long tookInNanos) {
postIndexing(create.parsedDoc(), tookInNanos);
}
/**
* Reads how much of the source to log. The user can specify any value they
* like and numbers are interpreted the maximum number of characters to log

View File

@ -85,25 +85,6 @@ public class ShardIndexingService extends AbstractIndexShardComponent {
listeners.remove(listener);
}
public Engine.Create preCreate(Engine.Create create) {
totalStats.indexCurrent.inc();
typeStats(create.type()).indexCurrent.inc();
for (IndexingOperationListener listener : listeners) {
create = listener.preCreate(create);
}
return create;
}
public void postCreateUnderLock(Engine.Create create) {
for (IndexingOperationListener listener : listeners) {
try {
listener.postCreateUnderLock(create);
} catch (Exception e) {
logger.warn("postCreateUnderLock listener [{}] failed", e, listener);
}
}
}
public void throttlingActivated() {
totalStats.setThrottled(true);
}
@ -112,40 +93,13 @@ public class ShardIndexingService extends AbstractIndexShardComponent {
totalStats.setThrottled(false);
}
public void postCreate(Engine.Create create) {
long took = create.endTime() - create.startTime();
totalStats.indexMetric.inc(took);
totalStats.indexCurrent.dec();
StatsHolder typeStats = typeStats(create.type());
typeStats.indexMetric.inc(took);
typeStats.indexCurrent.dec();
slowLog.postCreate(create, took);
for (IndexingOperationListener listener : listeners) {
try {
listener.postCreate(create);
} catch (Exception e) {
logger.warn("postCreate listener [{}] failed", e, listener);
}
}
}
public void postCreate(Engine.Create create, Throwable ex) {
for (IndexingOperationListener listener : listeners) {
try {
listener.postCreate(create, ex);
} catch (Throwable t) {
logger.warn("postCreate listener [{}] failed", t, listener);
}
}
}
public Engine.Index preIndex(Engine.Index index) {
public Engine.Index preIndex(Engine.Index operation) {
totalStats.indexCurrent.inc();
typeStats(index.type()).indexCurrent.inc();
typeStats(operation.type()).indexCurrent.inc();
for (IndexingOperationListener listener : listeners) {
index = listener.preIndex(index);
operation = listener.preIndex(operation);
}
return index;
return operation;
}
public void postIndexUnderLock(Engine.Index index) {

View File

@ -242,29 +242,12 @@ public final class PercolatorQueriesRegistry extends AbstractIndexShardComponent
private class RealTimePercolatorOperationListener extends IndexingOperationListener {
@Override
public Engine.Create preCreate(Engine.Create create) {
public Engine.Index preIndex(Engine.Index operation) {
// validate the query here, before we index
if (PercolatorService.TYPE_NAME.equals(create.type())) {
parsePercolatorDocument(create.id(), create.source());
if (PercolatorService.TYPE_NAME.equals(operation.type())) {
parsePercolatorDocument(operation.id(), operation.source());
}
return create;
}
@Override
public void postCreateUnderLock(Engine.Create create) {
// add the query under a doc lock
if (PercolatorService.TYPE_NAME.equals(create.type())) {
addPercolateQuery(create.id(), create.source());
}
}
@Override
public Engine.Index preIndex(Engine.Index index) {
// validate the query here, before we index
if (PercolatorService.TYPE_NAME.equals(index.type())) {
parsePercolatorDocument(index.id(), index.source());
}
return index;
return operation;
}
@Override

View File

@ -278,7 +278,9 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
return indexFieldDataService;
}
public MapperService mapperService() { return mapperService;}
public MapperService mapperService() {
return mapperService;
}
public ShardSearchStats searchService() {
return this.searchService;
@ -423,40 +425,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
return previousState;
}
public Engine.Create prepareCreate(SourceToParse source, long version, VersionType versionType, Engine.Operation.Origin origin) {
try {
return prepareCreate(docMapper(source.type()), source, version, versionType, origin);
} catch (Throwable t) {
verifyNotClosed(t);
throw t;
}
}
static Engine.Create prepareCreate(DocumentMapperForType docMapper, SourceToParse source, long version, VersionType versionType, Engine.Operation.Origin origin) {
long startTime = System.nanoTime();
ParsedDocument doc = docMapper.getDocumentMapper().parse(source);
if (docMapper.getMapping() != null) {
doc.addDynamicMappingsUpdate(docMapper.getMapping());
}
return new Engine.Create(docMapper.getDocumentMapper().uidMapper().term(doc.uid().stringValue()), doc, version, versionType, origin, startTime);
}
public void create(Engine.Create create) {
writeAllowed(create.origin());
create = indexingService.preCreate(create);
try {
if (logger.isTraceEnabled()) {
logger.trace("index [{}][{}]{}", create.type(), create.id(), create.docs());
}
getEngine().create(create);
create.endTime(System.nanoTime());
} catch (Throwable ex) {
indexingService.postCreate(create, ex);
throw ex;
}
indexingService.postCreate(create);
}
public Engine.Index prepareIndex(SourceToParse source, long version, VersionType versionType, Engine.Operation.Origin origin) {
try {
return prepareIndex(docMapper(source.type()), source, version, versionType, origin);
@ -1499,6 +1467,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
/**
* Schedules a flush if needed but won't schedule more than one flush concurrently. The flush will be executed on the
* Flush thread-pool asynchronously.
*
* @return <code>true</code> if a new flush is scheduled otherwise <code>false</code>.
*/
public boolean maybeFlush() {

View File

@ -145,19 +145,7 @@ public class TranslogRecoveryPerformer {
public void performRecoveryOperation(Engine engine, Translog.Operation operation, boolean allowMappingUpdates) {
try {
switch (operation.opType()) {
case CREATE:
Translog.Create create = (Translog.Create) operation;
Engine.Create engineCreate = IndexShard.prepareCreate(docMapper(create.type()),
source(create.source()).index(shardId.getIndex()).type(create.type()).id(create.id())
.routing(create.routing()).parent(create.parent()).timestamp(create.timestamp()).ttl(create.ttl()),
create.version(), create.versionType().versionTypeForReplicationAndRecovery(), Engine.Operation.Origin.RECOVERY);
maybeAddMappingUpdate(engineCreate.type(), engineCreate.parsedDoc().dynamicMappingsUpdate(), engineCreate.id(), allowMappingUpdates);
if (logger.isTraceEnabled()) {
logger.trace("[translog] recover [create] op of [{}][{}]", create.type(), create.id());
}
engine.create(engineCreate);
break;
case SAVE:
case INDEX:
Translog.Index index = (Translog.Index) operation;
Engine.Index engineIndex = IndexShard.prepareIndex(docMapper(index.type()), source(index.source()).type(index.type()).id(index.id())
.routing(index.routing()).parent(index.parent()).timestamp(index.timestamp()).ttl(index.ttl()),

View File

@ -465,11 +465,10 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
}
/**
* Adds a created / delete / index operations to the transaction log.
* Adds a delete / index operations to the transaction log.
*
* @see org.elasticsearch.index.translog.Translog.Operation
* @see org.elasticsearch.index.translog.Translog.Create
* @see org.elasticsearch.index.translog.Translog.Index
* @see Index
* @see org.elasticsearch.index.translog.Translog.Delete
*/
public Location add(Operation operation) throws TranslogException {
@ -874,8 +873,9 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
*/
public interface Operation extends Streamable {
enum Type {
@Deprecated
CREATE((byte) 1),
SAVE((byte) 2),
INDEX((byte) 2),
DELETE((byte) 3),
DELETE_BY_QUERY((byte) 4);
@ -894,7 +894,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
case 1:
return CREATE;
case 2:
return SAVE;
return INDEX;
case 3:
return DELETE;
case 4:
@ -929,199 +929,6 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
}
}
public static class Create implements Operation {
public static final int SERIALIZATION_FORMAT = 6;
private String id;
private String type;
private BytesReference source;
private String routing;
private String parent;
private long timestamp;
private long ttl;
private long version = Versions.MATCH_ANY;
private VersionType versionType = VersionType.INTERNAL;
public Create() {
}
public Create(Engine.Create create) {
this.id = create.id();
this.type = create.type();
this.source = create.source();
this.routing = create.routing();
this.parent = create.parent();
this.timestamp = create.timestamp();
this.ttl = create.ttl();
this.version = create.version();
this.versionType = create.versionType();
}
public Create(String type, String id, byte[] source) {
this.id = id;
this.type = type;
this.source = new BytesArray(source);
}
@Override
public Type opType() {
return Type.CREATE;
}
@Override
public long estimateSize() {
return ((id.length() + type.length()) * 2) + source.length() + 12;
}
public String id() {
return this.id;
}
public BytesReference source() {
return this.source;
}
public String type() {
return this.type;
}
public String routing() {
return this.routing;
}
public String parent() {
return this.parent;
}
public long timestamp() {
return this.timestamp;
}
public long ttl() {
return this.ttl;
}
public long version() {
return this.version;
}
public VersionType versionType() {
return versionType;
}
@Override
public Source getSource() {
return new Source(source, routing, parent, timestamp, ttl);
}
@Override
public void readFrom(StreamInput in) throws IOException {
int version = in.readVInt(); // version
id = in.readString();
type = in.readString();
source = in.readBytesReference();
if (version >= 1) {
if (in.readBoolean()) {
routing = in.readString();
}
}
if (version >= 2) {
if (in.readBoolean()) {
parent = in.readString();
}
}
if (version >= 3) {
this.version = in.readLong();
}
if (version >= 4) {
this.timestamp = in.readLong();
}
if (version >= 5) {
this.ttl = in.readLong();
}
if (version >= 6) {
this.versionType = VersionType.fromValue(in.readByte());
}
assert versionType.validateVersionForWrites(version);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(SERIALIZATION_FORMAT);
out.writeString(id);
out.writeString(type);
out.writeBytesReference(source);
if (routing == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeString(routing);
}
if (parent == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeString(parent);
}
out.writeLong(version);
out.writeLong(timestamp);
out.writeLong(ttl);
out.writeByte(versionType.getValue());
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Create create = (Create) o;
if (timestamp != create.timestamp ||
ttl != create.ttl ||
version != create.version ||
id.equals(create.id) == false ||
type.equals(create.type) == false ||
source.equals(create.source) == false) {
return false;
}
if (routing != null ? !routing.equals(create.routing) : create.routing != null) {
return false;
}
if (parent != null ? !parent.equals(create.parent) : create.parent != null) {
return false;
}
return versionType == create.versionType;
}
@Override
public int hashCode() {
int result = id.hashCode();
result = 31 * result + type.hashCode();
result = 31 * result + source.hashCode();
result = 31 * result + (routing != null ? routing.hashCode() : 0);
result = 31 * result + (parent != null ? parent.hashCode() : 0);
result = 31 * result + (int) (timestamp ^ (timestamp >>> 32));
result = 31 * result + (int) (ttl ^ (ttl >>> 32));
result = 31 * result + (int) (version ^ (version >>> 32));
result = 31 * result + versionType.hashCode();
return result;
}
@Override
public String toString() {
return "Create{" +
"id='" + id + '\'' +
", type='" + type + '\'' +
'}';
}
}
public static class Index implements Operation {
public static final int SERIALIZATION_FORMAT = 6;
@ -1158,7 +965,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
@Override
public Type opType() {
return Type.SAVE;
return Type.INDEX;
}
@Override
@ -1667,13 +1474,14 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
static Translog.Operation newOperationFromType(Translog.Operation.Type type) throws IOException {
switch (type) {
case CREATE:
return new Translog.Create();
// the deserialization logic in Index was identical to that of Create when create was deprecated
return new Index();
case DELETE:
return new Translog.Delete();
case DELETE_BY_QUERY:
return new Translog.DeleteByQuery();
case SAVE:
return new Translog.Index();
case INDEX:
return new Index();
default:
throw new IOException("No type for [" + type + "]");
}

View File

@ -20,7 +20,6 @@ package org.elasticsearch;
import com.fasterxml.jackson.core.JsonLocation;
import com.fasterxml.jackson.core.JsonParseException;
import org.apache.lucene.util.Constants;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.RoutingMissingException;
@ -31,12 +30,7 @@ import org.elasticsearch.client.AbstractClientHeadersTestCase;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.metadata.SnapshotId;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.IllegalShardRoutingStateException;
import org.elasticsearch.cluster.routing.RoutingTableValidation;
import org.elasticsearch.cluster.routing.RoutingValidationException;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.common.ParsingException;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.io.PathUtils;
@ -55,7 +49,6 @@ import org.elasticsearch.common.xcontent.XContentLocation;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.index.AlreadyExpiredException;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.engine.CreateFailedEngineException;
import org.elasticsearch.index.engine.IndexFailedEngineException;
import org.elasticsearch.index.engine.RecoveryEngineException;
import org.elasticsearch.index.mapper.MergeMappingException;
@ -139,9 +132,9 @@ public class ExceptionSerializationTests extends ESTestCase {
Class<?> clazz = loadClass(filename);
if (ignore.contains(clazz) == false) {
if (Modifier.isAbstract(clazz.getModifiers()) == false && Modifier.isInterface(clazz.getModifiers()) == false && isEsException(clazz)) {
if (ElasticsearchException.isRegistered((Class<? extends Throwable>)clazz) == false && ElasticsearchException.class.equals(clazz.getEnclosingClass()) == false) {
if (ElasticsearchException.isRegistered((Class<? extends Throwable>) clazz) == false && ElasticsearchException.class.equals(clazz.getEnclosingClass()) == false) {
notRegistered.add(clazz);
} else if (ElasticsearchException.isRegistered((Class<? extends Throwable>)clazz)) {
} else if (ElasticsearchException.isRegistered((Class<? extends Throwable>) clazz)) {
registered.add(clazz);
try {
if (clazz.getDeclaredMethod("writeTo", StreamOutput.class) != null) {
@ -199,7 +192,7 @@ public class ExceptionSerializationTests extends ESTestCase {
}
public static final class TestException extends ElasticsearchException {
public TestException(StreamInput in) throws IOException{
public TestException(StreamInput in) throws IOException {
super(in);
}
}
@ -247,7 +240,7 @@ public class ExceptionSerializationTests extends ESTestCase {
assertEquals(ex.getIndex(), "foo");
assertEquals(ex.getMessage(), "fobar");
ex = serialize(new QueryShardException((Index)null, null, null));
ex = serialize(new QueryShardException((Index) null, null, null));
assertNull(ex.getIndex());
assertNull(ex.getMessage());
}
@ -282,22 +275,8 @@ public class ExceptionSerializationTests extends ESTestCase {
assertEquals(-3, alreadyExpiredException.now());
}
public void testCreateFailedEngineException() throws IOException {
CreateFailedEngineException ex = serialize(new CreateFailedEngineException(new ShardId("idx", 2), "type", "id", null));
assertEquals(ex.getShardId(), new ShardId("idx", 2));
assertEquals("type", ex.type());
assertEquals("id", ex.id());
assertNull(ex.getCause());
ex = serialize(new CreateFailedEngineException(null, "type", "id", new NullPointerException()));
assertNull(ex.getShardId());
assertEquals("type", ex.type());
assertEquals("id", ex.id());
assertTrue(ex.getCause() instanceof NullPointerException);
}
public void testMergeMappingException() throws IOException {
MergeMappingException ex = serialize(new MergeMappingException(new String[] {"one", "two"}));
MergeMappingException ex = serialize(new MergeMappingException(new String[]{"one", "two"}));
assertArrayEquals(ex.failures(), new String[]{"one", "two"});
}
@ -342,7 +321,7 @@ public class ExceptionSerializationTests extends ESTestCase {
assertEquals("the dude abides!", ex.name());
assertEquals("index_template [the dude abides!] already exists", ex.getMessage());
ex = serialize(new IndexTemplateAlreadyExistsException((String)null));
ex = serialize(new IndexTemplateAlreadyExistsException((String) null));
assertNull(ex.name());
assertEquals("index_template [null] already exists", ex.getMessage());
}
@ -449,7 +428,7 @@ public class ExceptionSerializationTests extends ESTestCase {
assertEquals(ctx.shardTarget(), ex.shard());
}
public void testIllegalIndexShardStateException()throws IOException {
public void testIllegalIndexShardStateException() throws IOException {
ShardId id = new ShardId("foo", 1);
IndexShardState state = randomFrom(IndexShardState.values());
IllegalIndexShardStateException ex = serialize(new IllegalIndexShardStateException(id, state, "come back later buddy"));
@ -480,7 +459,7 @@ public class ExceptionSerializationTests extends ESTestCase {
assertEquals("baam", ex.getMessage());
assertTrue(ex.getCause() instanceof NullPointerException);
assertEquals(empty.length, ex.shardFailures().length);
ShardSearchFailure[] one = new ShardSearchFailure[] {
ShardSearchFailure[] one = new ShardSearchFailure[]{
new ShardSearchFailure(new IllegalArgumentException("nono!"))
};
@ -521,7 +500,7 @@ public class ExceptionSerializationTests extends ESTestCase {
assertEquals("index_template [name] missing", ex.getMessage());
assertEquals("name", ex.name());
ex = serialize(new IndexTemplateMissingException((String)null));
ex = serialize(new IndexTemplateMissingException((String) null));
assertEquals("index_template [null] missing", ex.getMessage());
assertNull(ex.name());
}
@ -570,8 +549,8 @@ public class ExceptionSerializationTests extends ESTestCase {
ex = serialize(new NotSerializableExceptionWrapper(new IllegalArgumentException("nono!")));
assertEquals("{\"type\":\"illegal_argument_exception\",\"reason\":\"nono!\"}", toXContent(ex));
Throwable[] unknowns = new Throwable[] {
new JsonParseException("foobar", new JsonLocation(new Object(), 1,2,3,4)),
Throwable[] unknowns = new Throwable[]{
new JsonParseException("foobar", new JsonLocation(new Object(), 1, 2, 3, 4)),
new ClassCastException("boom boom boom"),
new IOException("booom")
};
@ -609,7 +588,7 @@ public class ExceptionSerializationTests extends ESTestCase {
UnknownHeaderException uhe = new UnknownHeaderException("msg", status);
uhe.addHeader("foo", "foo", "bar");
ElasticsearchException serialize = serialize((ElasticsearchException)uhe);
ElasticsearchException serialize = serialize((ElasticsearchException) uhe);
assertTrue(serialize instanceof NotSerializableExceptionWrapper);
NotSerializableExceptionWrapper e = (NotSerializableExceptionWrapper) serialize;
assertEquals("msg", e.getMessage());
@ -684,7 +663,7 @@ public class ExceptionSerializationTests extends ESTestCase {
ids.put(19, org.elasticsearch.ResourceNotFoundException.class);
ids.put(20, org.elasticsearch.transport.ActionTransportException.class);
ids.put(21, org.elasticsearch.ElasticsearchGenerationException.class);
ids.put(22, org.elasticsearch.index.engine.CreateFailedEngineException.class);
ids.put(22, null); // was CreateFailedEngineException
ids.put(23, org.elasticsearch.index.shard.IndexShardStartedException.class);
ids.put(24, org.elasticsearch.search.SearchContextMissingException.class);
ids.put(25, org.elasticsearch.script.ScriptException.class);
@ -716,7 +695,7 @@ public class ExceptionSerializationTests extends ESTestCase {
ids.put(51, org.elasticsearch.index.IndexShardAlreadyExistsException.class);
ids.put(52, org.elasticsearch.index.engine.VersionConflictEngineException.class);
ids.put(53, org.elasticsearch.index.engine.EngineException.class);
ids.put(54, org.elasticsearch.index.engine.DocumentAlreadyExistsException.class);
ids.put(54, null); // was DocumentAlreadyExistsException, which is superseded with VersionConflictEngineException
ids.put(55, org.elasticsearch.action.NoSuchNodeException.class);
ids.put(56, org.elasticsearch.common.settings.SettingsException.class);
ids.put(57, org.elasticsearch.indices.IndexTemplateMissingException.class);
@ -813,7 +792,7 @@ public class ExceptionSerializationTests extends ESTestCase {
}
for (ElasticsearchException.ElasticsearchExceptionHandle handle : ElasticsearchException.ElasticsearchExceptionHandle.values()) {
assertEquals((int)reverse.get(handle.exceptionClass), handle.id);
assertEquals((int) reverse.get(handle.exceptionClass), handle.id);
}
for (Map.Entry<Integer, Class<? extends ElasticsearchException>> entry : ids.entrySet()) {

View File

@ -18,9 +18,15 @@
*/
package org.elasticsearch.action.index;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.test.ESTestCase;
import org.junit.Test;
import static org.hamcrest.Matchers.equalTo;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import static org.hamcrest.Matchers.*;
/**
*/
@ -39,9 +45,23 @@ public class IndexRequestTests extends ESTestCase {
assertThat(IndexRequest.OpType.fromString(indexUpper), equalTo(IndexRequest.OpType.INDEX));
}
@Test(expected= IllegalArgumentException.class)
public void testReadBogusString(){
@Test(expected = IllegalArgumentException.class)
public void testReadBogusString() {
String foobar = "foobar";
IndexRequest.OpType.fromString(foobar);
}
public void testCreateOperationRejectsVersions() {
Set<VersionType> allButInternalSet = new HashSet<>(Arrays.asList(VersionType.values()));
allButInternalSet.remove(VersionType.INTERNAL);
VersionType[] allButInternal = allButInternalSet.toArray(new VersionType[]{});
IndexRequest request = new IndexRequest("index", "type", "1");
request.opType(IndexRequest.OpType.CREATE);
request.versionType(randomFrom(allButInternal));
assertThat(request.validate().validationErrors(), not(empty()));
request.versionType(VersionType.INTERNAL);
request.version(randomIntBetween(0, Integer.MAX_VALUE));
assertThat(request.validate().validationErrors(), not(empty()));
}
}

View File

@ -25,11 +25,7 @@ import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequestBuilder;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.get.MultiGetRequest;
import org.elasticsearch.action.get.MultiGetRequestBuilder;
import org.elasticsearch.action.get.MultiGetResponse;
import org.elasticsearch.action.get.*;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
@ -53,14 +49,7 @@ import java.util.Set;
import static java.util.Collections.singleton;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.startsWith;
import static org.hamcrest.Matchers.*;
public class GetActionIT extends ESIntegTestCase {
@ -600,7 +589,7 @@ public class GetActionIT extends ESIntegTestCase {
assertThat(response.getResponses()[1].getResponse().getSourceAsMap().get("field").toString(), equalTo("value1"));
assertThat(response.getResponses()[2].getFailure(), notNullValue());
assertThat(response.getResponses()[2].getFailure().getId(), equalTo("1"));
assertThat(response.getResponses()[2].getFailure().getMessage(), startsWith("[type1][1]: version conflict, current [1], provided [2]"));
assertThat(response.getResponses()[2].getFailure().getMessage(), startsWith("[type1][1]: version conflict"));
assertThat(response.getResponses()[2].getFailure().getFailure(), instanceOf(VersionConflictEngineException.class));
//Version from Lucene index
@ -623,7 +612,7 @@ public class GetActionIT extends ESIntegTestCase {
assertThat(response.getResponses()[1].getResponse().getSourceAsMap().get("field").toString(), equalTo("value1"));
assertThat(response.getResponses()[2].getFailure(), notNullValue());
assertThat(response.getResponses()[2].getFailure().getId(), equalTo("1"));
assertThat(response.getResponses()[2].getFailure().getMessage(), startsWith("[type1][1]: version conflict, current [1], provided [2]"));
assertThat(response.getResponses()[2].getFailure().getMessage(), startsWith("[type1][1]: version conflict"));
assertThat(response.getResponses()[2].getFailure().getFailure(), instanceOf(VersionConflictEngineException.class));
@ -648,7 +637,7 @@ public class GetActionIT extends ESIntegTestCase {
assertThat(response.getResponses()[1].getFailure(), notNullValue());
assertThat(response.getResponses()[1].getFailure().getId(), equalTo("2"));
assertThat(response.getResponses()[1].getIndex(), equalTo("test"));
assertThat(response.getResponses()[1].getFailure().getMessage(), startsWith("[type1][2]: version conflict, current [2], provided [1]"));
assertThat(response.getResponses()[1].getFailure().getMessage(), startsWith("[type1][2]: version conflict"));
assertThat(response.getResponses()[2].getId(), equalTo("2"));
assertThat(response.getResponses()[2].getIndex(), equalTo("test"));
assertThat(response.getResponses()[2].getFailure(), nullValue());
@ -674,7 +663,7 @@ public class GetActionIT extends ESIntegTestCase {
assertThat(response.getResponses()[1].getFailure(), notNullValue());
assertThat(response.getResponses()[1].getFailure().getId(), equalTo("2"));
assertThat(response.getResponses()[1].getIndex(), equalTo("test"));
assertThat(response.getResponses()[1].getFailure().getMessage(), startsWith("[type1][2]: version conflict, current [2], provided [1]"));
assertThat(response.getResponses()[1].getFailure().getMessage(), startsWith("[type1][2]: version conflict"));
assertThat(response.getResponses()[2].getId(), equalTo("2"));
assertThat(response.getResponses()[2].getIndex(), equalTo("test"));
assertThat(response.getResponses()[2].getFailure(), nullValue());

View File

@ -29,26 +29,31 @@ public class VersionTypeTests extends ESTestCase {
@Test
public void testInternalVersionConflict() throws Exception {
assertFalse(VersionType.INTERNAL.isVersionConflictForWrites(10, Versions.MATCH_ANY));
assertFalse(VersionType.INTERNAL.isVersionConflictForWrites(10, Versions.MATCH_ANY, randomBoolean()));
assertFalse(VersionType.INTERNAL.isVersionConflictForReads(10, Versions.MATCH_ANY));
// if we don't have a version in the index we accept everything
assertFalse(VersionType.INTERNAL.isVersionConflictForWrites(Versions.NOT_SET, 10));
assertFalse(VersionType.INTERNAL.isVersionConflictForWrites(Versions.NOT_SET, 10, randomBoolean()));
assertFalse(VersionType.INTERNAL.isVersionConflictForReads(Versions.NOT_SET, 10));
assertFalse(VersionType.INTERNAL.isVersionConflictForWrites(Versions.NOT_SET, Versions.MATCH_ANY));
assertFalse(VersionType.INTERNAL.isVersionConflictForWrites(Versions.NOT_SET, Versions.MATCH_ANY, randomBoolean()));
assertFalse(VersionType.INTERNAL.isVersionConflictForReads(Versions.NOT_SET, Versions.MATCH_ANY));
// if we didn't find a version (but the index does support it), we don't like it unless MATCH_ANY
assertTrue(VersionType.INTERNAL.isVersionConflictForWrites(Versions.NOT_FOUND, 10));
assertTrue(VersionType.INTERNAL.isVersionConflictForWrites(Versions.NOT_FOUND, 10, randomBoolean()));
assertTrue(VersionType.INTERNAL.isVersionConflictForReads(Versions.NOT_FOUND, 10));
assertFalse(VersionType.INTERNAL.isVersionConflictForWrites(Versions.NOT_FOUND, Versions.MATCH_ANY));
assertFalse(VersionType.INTERNAL.isVersionConflictForWrites(Versions.NOT_FOUND, Versions.MATCH_ANY, randomBoolean()));
assertFalse(VersionType.INTERNAL.isVersionConflictForReads(Versions.NOT_FOUND, Versions.MATCH_ANY));
// deletes
assertFalse(VersionType.INTERNAL.isVersionConflictForWrites(Versions.NOT_FOUND, Versions.MATCH_DELETED, true));
assertFalse(VersionType.INTERNAL.isVersionConflictForWrites(10, Versions.MATCH_DELETED, true));
// and the stupid usual case
assertFalse(VersionType.INTERNAL.isVersionConflictForWrites(10, 10));
assertFalse(VersionType.INTERNAL.isVersionConflictForWrites(10, 10, randomBoolean()));
assertFalse(VersionType.INTERNAL.isVersionConflictForReads(10, 10));
assertTrue(VersionType.INTERNAL.isVersionConflictForWrites(9, 10));
assertTrue(VersionType.INTERNAL.isVersionConflictForWrites(9, 10, randomBoolean()));
assertTrue(VersionType.INTERNAL.isVersionConflictForReads(9, 10));
assertTrue(VersionType.INTERNAL.isVersionConflictForWrites(10, 9));
assertTrue(VersionType.INTERNAL.isVersionConflictForWrites(10, 9, randomBoolean()));
assertTrue(VersionType.INTERNAL.isVersionConflictForReads(10, 9));
// Old indexing code, dictating behavior
@ -99,23 +104,23 @@ public class VersionTypeTests extends ESTestCase {
@Test
public void testExternalVersionConflict() throws Exception {
assertFalse(VersionType.EXTERNAL.isVersionConflictForWrites(Versions.NOT_FOUND, 10));
assertFalse(VersionType.EXTERNAL.isVersionConflictForWrites(Versions.NOT_SET, 10));
assertFalse(VersionType.EXTERNAL.isVersionConflictForWrites(Versions.NOT_FOUND, 10, randomBoolean()));
assertFalse(VersionType.EXTERNAL.isVersionConflictForWrites(Versions.NOT_SET, 10, randomBoolean()));
// MATCH_ANY must throw an exception in the case of external version, as the version must be set! it used as the new value
assertTrue(VersionType.EXTERNAL.isVersionConflictForWrites(10, Versions.MATCH_ANY));
assertTrue(VersionType.EXTERNAL.isVersionConflictForWrites(10, Versions.MATCH_ANY, randomBoolean()));
// if we didn't find a version (but the index does support it), we always accept
assertFalse(VersionType.EXTERNAL.isVersionConflictForWrites(Versions.NOT_FOUND, Versions.NOT_FOUND));
assertFalse(VersionType.EXTERNAL.isVersionConflictForWrites(Versions.NOT_FOUND, 10));
assertFalse(VersionType.EXTERNAL.isVersionConflictForWrites(Versions.NOT_FOUND, Versions.NOT_FOUND, randomBoolean()));
assertFalse(VersionType.EXTERNAL.isVersionConflictForWrites(Versions.NOT_FOUND, 10, randomBoolean()));
assertTrue(VersionType.EXTERNAL.isVersionConflictForReads(Versions.NOT_FOUND, Versions.NOT_FOUND));
assertTrue(VersionType.EXTERNAL.isVersionConflictForReads(Versions.NOT_FOUND, 10));
assertFalse(VersionType.EXTERNAL.isVersionConflictForReads(Versions.NOT_FOUND, Versions.MATCH_ANY));
// and the standard behavior
assertTrue(VersionType.EXTERNAL.isVersionConflictForWrites(10, 10));
assertFalse(VersionType.EXTERNAL.isVersionConflictForWrites(9, 10));
assertTrue(VersionType.EXTERNAL.isVersionConflictForWrites(10, 9));
assertTrue(VersionType.EXTERNAL.isVersionConflictForWrites(10, 10, randomBoolean()));
assertFalse(VersionType.EXTERNAL.isVersionConflictForWrites(9, 10, randomBoolean()));
assertTrue(VersionType.EXTERNAL.isVersionConflictForWrites(10, 9, randomBoolean()));
assertFalse(VersionType.EXTERNAL.isVersionConflictForReads(10, 10));
assertTrue(VersionType.EXTERNAL.isVersionConflictForReads(9, 10));
@ -137,14 +142,14 @@ public class VersionTypeTests extends ESTestCase {
@Test
public void testExternalGTEVersionConflict() throws Exception {
assertFalse(VersionType.EXTERNAL_GTE.isVersionConflictForWrites(Versions.NOT_FOUND, 10));
assertFalse(VersionType.EXTERNAL_GTE.isVersionConflictForWrites(Versions.NOT_SET, 10));
assertFalse(VersionType.EXTERNAL_GTE.isVersionConflictForWrites(Versions.NOT_FOUND, 10, randomBoolean()));
assertFalse(VersionType.EXTERNAL_GTE.isVersionConflictForWrites(Versions.NOT_SET, 10, randomBoolean()));
// MATCH_ANY must throw an exception in the case of external version, as the version must be set! it used as the new value
assertTrue(VersionType.EXTERNAL_GTE.isVersionConflictForWrites(10, Versions.MATCH_ANY));
assertTrue(VersionType.EXTERNAL_GTE.isVersionConflictForWrites(10, Versions.MATCH_ANY, randomBoolean()));
// if we didn't find a version (but the index does support it), we always accept
assertFalse(VersionType.EXTERNAL_GTE.isVersionConflictForWrites(Versions.NOT_FOUND, Versions.NOT_FOUND));
assertFalse(VersionType.EXTERNAL_GTE.isVersionConflictForWrites(Versions.NOT_FOUND, 10));
assertFalse(VersionType.EXTERNAL_GTE.isVersionConflictForWrites(Versions.NOT_FOUND, Versions.NOT_FOUND, randomBoolean()));
assertFalse(VersionType.EXTERNAL_GTE.isVersionConflictForWrites(Versions.NOT_FOUND, 10, randomBoolean()));
assertTrue(VersionType.EXTERNAL_GTE.isVersionConflictForReads(Versions.NOT_FOUND, Versions.NOT_FOUND));
assertTrue(VersionType.EXTERNAL_GTE.isVersionConflictForReads(Versions.NOT_FOUND, 10));
@ -152,9 +157,9 @@ public class VersionTypeTests extends ESTestCase {
// and the standard behavior
assertFalse(VersionType.EXTERNAL_GTE.isVersionConflictForWrites(10, 10));
assertFalse(VersionType.EXTERNAL_GTE.isVersionConflictForWrites(9, 10));
assertTrue(VersionType.EXTERNAL_GTE.isVersionConflictForWrites(10, 9));
assertFalse(VersionType.EXTERNAL_GTE.isVersionConflictForWrites(10, 10, randomBoolean()));
assertFalse(VersionType.EXTERNAL_GTE.isVersionConflictForWrites(9, 10, randomBoolean()));
assertTrue(VersionType.EXTERNAL_GTE.isVersionConflictForWrites(10, 9, randomBoolean()));
assertFalse(VersionType.EXTERNAL_GTE.isVersionConflictForReads(10, 10));
assertTrue(VersionType.EXTERNAL_GTE.isVersionConflictForReads(9, 10));
@ -166,14 +171,20 @@ public class VersionTypeTests extends ESTestCase {
@Test
public void testForceVersionConflict() throws Exception {
assertFalse(VersionType.FORCE.isVersionConflictForWrites(Versions.NOT_FOUND, 10));
assertFalse(VersionType.FORCE.isVersionConflictForWrites(Versions.NOT_SET, 10));
// MATCH_ANY must throw an exception in the case of external version, as the version must be set! it used as the new value
assertTrue(VersionType.FORCE.isVersionConflictForWrites(10, Versions.MATCH_ANY));
assertFalse(VersionType.FORCE.isVersionConflictForWrites(Versions.NOT_FOUND, 10, randomBoolean()));
assertFalse(VersionType.FORCE.isVersionConflictForWrites(Versions.NOT_SET, 10, randomBoolean()));
// MATCH_ANY must throw an exception in the case of force version, as the version must be set! it used as the new value
try {
VersionType.FORCE.isVersionConflictForWrites(10, Versions.MATCH_ANY, randomBoolean());
fail();
} catch (IllegalStateException e) {
//yes!!
}
// if we didn't find a version (but the index does support it), we always accept
assertFalse(VersionType.FORCE.isVersionConflictForWrites(Versions.NOT_FOUND, Versions.NOT_FOUND));
assertFalse(VersionType.FORCE.isVersionConflictForWrites(Versions.NOT_FOUND, 10));
assertFalse(VersionType.FORCE.isVersionConflictForWrites(Versions.NOT_FOUND, Versions.NOT_FOUND, randomBoolean()));
assertFalse(VersionType.FORCE.isVersionConflictForWrites(Versions.NOT_FOUND, 10, randomBoolean()));
assertFalse(VersionType.FORCE.isVersionConflictForReads(Versions.NOT_FOUND, Versions.NOT_FOUND));
assertFalse(VersionType.FORCE.isVersionConflictForReads(Versions.NOT_FOUND, 10));
@ -181,9 +192,9 @@ public class VersionTypeTests extends ESTestCase {
// and the standard behavior
assertFalse(VersionType.FORCE.isVersionConflictForWrites(10, 10));
assertFalse(VersionType.FORCE.isVersionConflictForWrites(9, 10));
assertFalse(VersionType.FORCE.isVersionConflictForWrites(10, 9));
assertFalse(VersionType.FORCE.isVersionConflictForWrites(10, 10, randomBoolean()));
assertFalse(VersionType.FORCE.isVersionConflictForWrites(9, 10, randomBoolean()));
assertFalse(VersionType.FORCE.isVersionConflictForWrites(10, 9, randomBoolean()));
assertFalse(VersionType.FORCE.isVersionConflictForReads(10, 10));
assertFalse(VersionType.FORCE.isVersionConflictForReads(9, 10));
assertFalse(VersionType.FORCE.isVersionConflictForReads(10, 9));

View File

@ -96,7 +96,6 @@ import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import static org.elasticsearch.common.settings.Settings.Builder.EMPTY_SETTINGS;
import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY;
@ -105,8 +104,6 @@ import static org.hamcrest.Matchers.*;
public class InternalEngineTests extends ESTestCase {
private static final Pattern PARSE_LEGACY_ID_PATTERN = Pattern.compile("^" + Translog.TRANSLOG_FILE_PREFIX + "(\\d+)((\\.recovering))?$");
protected final ShardId shardId = new ShardId(new Index("index"), 1);
protected ThreadPool threadPool;
@ -273,10 +270,10 @@ public class InternalEngineTests extends ESTestCase {
// create a doc and refresh
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null);
engine.create(new Engine.Create(newUid("1"), doc));
engine.index(new Engine.Index(newUid("1"), doc));
ParsedDocument doc2 = testParsedDocument("2", "2", "test", null, -1, -1, testDocumentWithTextField(), B_2, null);
engine.create(new Engine.Create(newUid("2"), doc2));
engine.index(new Engine.Index(newUid("2"), doc2));
engine.refresh("test");
segments = engine.segments(false);
@ -310,7 +307,7 @@ public class InternalEngineTests extends ESTestCase {
engine.onSettingsChanged();
ParsedDocument doc3 = testParsedDocument("3", "3", "test", null, -1, -1, testDocumentWithTextField(), B_3, null);
engine.create(new Engine.Create(newUid("3"), doc3));
engine.index(new Engine.Index(newUid("3"), doc3));
engine.refresh("test");
segments = engine.segments(false);
@ -358,7 +355,7 @@ public class InternalEngineTests extends ESTestCase {
engine.config().setCompoundOnFlush(true);
engine.onSettingsChanged();
ParsedDocument doc4 = testParsedDocument("4", "4", "test", null, -1, -1, testDocumentWithTextField(), B_3, null);
engine.create(new Engine.Create(newUid("4"), doc4));
engine.index(new Engine.Index(newUid("4"), doc4));
engine.refresh("test");
segments = engine.segments(false);
@ -392,7 +389,7 @@ public class InternalEngineTests extends ESTestCase {
assertThat(segments.isEmpty(), equalTo(true));
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null);
engine.create(new Engine.Create(newUid("1"), doc));
engine.index(new Engine.Index(newUid("1"), doc));
engine.refresh("test");
segments = engine.segments(true);
@ -400,10 +397,10 @@ public class InternalEngineTests extends ESTestCase {
assertThat(segments.get(0).ramTree, notNullValue());
ParsedDocument doc2 = testParsedDocument("2", "2", "test", null, -1, -1, testDocumentWithTextField(), B_2, null);
engine.create(new Engine.Create(newUid("2"), doc2));
engine.index(new Engine.Index(newUid("2"), doc2));
engine.refresh("test");
ParsedDocument doc3 = testParsedDocument("3", "3", "test", null, -1, -1, testDocumentWithTextField(), B_3, null);
engine.create(new Engine.Create(newUid("3"), doc3));
engine.index(new Engine.Index(newUid("3"), doc3));
engine.refresh("test");
segments = engine.segments(true);
@ -473,7 +470,7 @@ public class InternalEngineTests extends ESTestCase {
Document document = testDocumentWithTextField();
document.add(new Field(SourceFieldMapper.NAME, B_1.toBytes(), SourceFieldMapper.Defaults.FIELD_TYPE));
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, document, B_1, null);
engine.create(new Engine.Create(newUid("1"), doc));
engine.index(new Engine.Index(newUid("1"), doc));
CommitStats stats1 = engine.commitStats();
assertThat(stats1.getGeneration(), greaterThan(0l));
@ -524,7 +521,7 @@ public class InternalEngineTests extends ESTestCase {
/* */
public void testConcurrentGetAndFlush() throws Exception {
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null);
engine.create(new Engine.Create(newUid("1"), doc));
engine.index(new Engine.Index(newUid("1"), doc));
final AtomicReference<Engine.GetResult> latestGetResult = new AtomicReference<>();
latestGetResult.set(engine.get(new Engine.Get(true, newUid("1"))));
@ -569,7 +566,7 @@ public class InternalEngineTests extends ESTestCase {
Document document = testDocumentWithTextField();
document.add(new Field(SourceFieldMapper.NAME, B_1.toBytes(), SourceFieldMapper.Defaults.FIELD_TYPE));
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, document, B_1, null);
engine.create(new Engine.Create(newUid("1"), doc));
engine.index(new Engine.Index(newUid("1"), doc));
// its not there...
searchResult = engine.acquireSearcher("test");
@ -661,7 +658,7 @@ public class InternalEngineTests extends ESTestCase {
document = testDocumentWithTextField();
document.add(new Field(SourceFieldMapper.NAME, B_1.toBytes(), SourceFieldMapper.Defaults.FIELD_TYPE));
doc = testParsedDocument("1", "1", "test", null, -1, -1, document, B_1, null);
engine.create(new Engine.Create(newUid("1"), doc));
engine.index(new Engine.Index(newUid("1"), doc, Versions.MATCH_DELETED));
// its not there...
searchResult = engine.acquireSearcher("test");
@ -722,7 +719,7 @@ public class InternalEngineTests extends ESTestCase {
// create a document
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null);
engine.create(new Engine.Create(newUid("1"), doc));
engine.index(new Engine.Index(newUid("1"), doc));
// its not there...
searchResult = engine.acquireSearcher("test");
@ -758,7 +755,7 @@ public class InternalEngineTests extends ESTestCase {
new LogByteSizeMergePolicy()), false)) {
final String syncId = randomUnicodeOfCodepointLengthBetween(10, 20);
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null);
engine.create(new Engine.Create(newUid("1"), doc));
engine.index(new Engine.Index(newUid("1"), doc));
Engine.CommitId commitID = engine.flush();
assertThat(commitID, equalTo(new Engine.CommitId(store.readLastCommittedSegmentsInfo().getId())));
byte[] wrongBytes = Base64.decode(commitID.toString());
@ -766,7 +763,7 @@ public class InternalEngineTests extends ESTestCase {
Engine.CommitId wrongId = new Engine.CommitId(wrongBytes);
assertEquals("should fail to sync flush with wrong id (but no docs)", engine.syncFlush(syncId + "1", wrongId),
Engine.SyncedFlushResult.COMMIT_MISMATCH);
engine.create(new Engine.Create(newUid("2"), doc));
engine.index(new Engine.Index(newUid("2"), doc));
assertEquals("should fail to sync flush with right id but pending doc", engine.syncFlush(syncId + "2", commitID),
Engine.SyncedFlushResult.PENDING_OPERATIONS);
commitID = engine.flush();
@ -780,7 +777,7 @@ public class InternalEngineTests extends ESTestCase {
public void testSycnedFlushSurvivesEngineRestart() throws IOException {
final String syncId = randomUnicodeOfCodepointLengthBetween(10, 20);
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null);
engine.create(new Engine.Create(newUid("1"), doc));
engine.index(new Engine.Index(newUid("1"), doc));
final Engine.CommitId commitID = engine.flush();
assertEquals("should succeed to flush commit with right id and no pending doc", engine.syncFlush(syncId, commitID),
Engine.SyncedFlushResult.SUCCESS);
@ -799,14 +796,14 @@ public class InternalEngineTests extends ESTestCase {
public void testSycnedFlushVanishesOnReplay() throws IOException {
final String syncId = randomUnicodeOfCodepointLengthBetween(10, 20);
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null);
engine.create(new Engine.Create(newUid("1"), doc));
engine.index(new Engine.Index(newUid("1"), doc));
final Engine.CommitId commitID = engine.flush();
assertEquals("should succeed to flush commit with right id and no pending doc", engine.syncFlush(syncId, commitID),
Engine.SyncedFlushResult.SUCCESS);
assertEquals(store.readLastCommittedSegmentsInfo().getUserData().get(Engine.SYNC_COMMIT_ID), syncId);
assertEquals(engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID), syncId);
doc = testParsedDocument("2", "2", "test", null, -1, -1, testDocumentWithTextField(), new BytesArray("{}"), null);
engine.create(new Engine.Create(newUid("2"), doc));
engine.index(new Engine.Index(newUid("2"), doc));
EngineConfig config = engine.config();
engine.close();
final MockDirectoryWrapper directory = DirectoryUtils.getLeaf(store.directory(), MockDirectoryWrapper.class);
@ -823,27 +820,15 @@ public class InternalEngineTests extends ESTestCase {
@Test
public void testVersioningNewCreate() {
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, null);
Engine.Create create = new Engine.Create(newUid("1"), doc);
engine.create(create);
Engine.Index create = new Engine.Index(newUid("1"), doc, Versions.MATCH_DELETED);
engine.index(create);
assertThat(create.version(), equalTo(1l));
create = new Engine.Create(newUid("1"), doc, create.version(), create.versionType().versionTypeForReplicationAndRecovery(), REPLICA, 0);
replicaEngine.create(create);
create = new Engine.Index(newUid("1"), doc, create.version(), create.versionType().versionTypeForReplicationAndRecovery(), REPLICA, 0);
replicaEngine.index(create);
assertThat(create.version(), equalTo(1l));
}
@Test
public void testExternalVersioningNewCreate() {
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, null);
Engine.Create create = new Engine.Create(newUid("1"), doc, 12, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, 0);
engine.create(create);
assertThat(create.version(), equalTo(12l));
create = new Engine.Create(newUid("1"), doc, create.version(), create.versionType().versionTypeForReplicationAndRecovery(), REPLICA, 0);
replicaEngine.create(create);
assertThat(create.version(), equalTo(12l));
}
@Test
public void testVersioningNewIndex() {
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, null);
@ -1107,9 +1092,9 @@ public class InternalEngineTests extends ESTestCase {
}
// we shouldn't be able to create as well
Engine.Create create = new Engine.Create(newUid("1"), doc, 2l, VersionType.INTERNAL, PRIMARY, 0);
Engine.Index create = new Engine.Index(newUid("1"), doc, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, 0);
try {
engine.create(create);
engine.index(create);
} catch (VersionConflictEngineException e) {
// all is well
}
@ -1164,9 +1149,9 @@ public class InternalEngineTests extends ESTestCase {
}
// we shouldn't be able to create as well
Engine.Create create = new Engine.Create(newUid("1"), doc, 2l, VersionType.INTERNAL, PRIMARY, 0);
Engine.Index create = new Engine.Index(newUid("1"), doc, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, 0);
try {
engine.create(create);
engine.index(create);
} catch (VersionConflictEngineException e) {
// all is well
}
@ -1175,15 +1160,15 @@ public class InternalEngineTests extends ESTestCase {
@Test
public void testVersioningCreateExistsException() {
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, null);
Engine.Create create = new Engine.Create(newUid("1"), doc, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, 0);
engine.create(create);
Engine.Index create = new Engine.Index(newUid("1"), doc, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, 0);
engine.index(create);
assertThat(create.version(), equalTo(1l));
create = new Engine.Create(newUid("1"), doc, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, 0);
create = new Engine.Index(newUid("1"), doc, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, 0);
try {
engine.create(create);
engine.index(create);
fail();
} catch (DocumentAlreadyExistsException e) {
} catch (VersionConflictEngineException e) {
// all is well
}
}
@ -1191,17 +1176,17 @@ public class InternalEngineTests extends ESTestCase {
@Test
public void testVersioningCreateExistsExceptionWithFlush() {
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, null);
Engine.Create create = new Engine.Create(newUid("1"), doc, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, 0);
engine.create(create);
Engine.Index create = new Engine.Index(newUid("1"), doc, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, 0);
engine.index(create);
assertThat(create.version(), equalTo(1l));
engine.flush();
create = new Engine.Create(newUid("1"), doc, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, 0);
create = new Engine.Index(newUid("1"), doc, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, 0);
try {
engine.create(create);
engine.index(create);
fail();
} catch (DocumentAlreadyExistsException e) {
} catch (VersionConflictEngineException e) {
// all is well
}
}
@ -1365,13 +1350,13 @@ public class InternalEngineTests extends ESTestCase {
try {
// First, with DEBUG, which should NOT log IndexWriter output:
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null);
engine.create(new Engine.Create(newUid("1"), doc));
engine.index(new Engine.Index(newUid("1"), doc));
engine.flush();
assertFalse(mockAppender.sawIndexWriterMessage);
// Again, with TRACE, which should log IndexWriter output:
rootLogger.setLevel(Level.TRACE);
engine.create(new Engine.Create(newUid("2"), doc));
engine.index(new Engine.Index(newUid("2"), doc));
engine.flush();
assertTrue(mockAppender.sawIndexWriterMessage);
@ -1400,14 +1385,14 @@ public class InternalEngineTests extends ESTestCase {
try {
// First, with DEBUG, which should NOT log IndexWriter output:
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null);
engine.create(new Engine.Create(newUid("1"), doc));
engine.index(new Engine.Index(newUid("1"), doc));
engine.flush();
assertFalse(mockAppender.sawIndexWriterMessage);
assertFalse(mockAppender.sawIndexWriterIFDMessage);
// Again, with TRACE, which should only log IndexWriter IFD output:
iwIFDLogger.setLevel(Level.TRACE);
engine.create(new Engine.Create(newUid("2"), doc));
engine.index(new Engine.Index(newUid("2"), doc));
engine.flush();
assertFalse(mockAppender.sawIndexWriterMessage);
assertTrue(mockAppender.sawIndexWriterIFDMessage);
@ -1607,8 +1592,8 @@ public class InternalEngineTests extends ESTestCase {
final int numDocs = randomIntBetween(1, 10);
for (int i = 0; i < numDocs; i++) {
ParsedDocument doc = testParsedDocument(Integer.toString(i), Integer.toString(i), "test", null, -1, -1, testDocument(), new BytesArray("{}"), null);
Engine.Create firstIndexRequest = new Engine.Create(newUid(Integer.toString(i)), doc, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime());
engine.create(firstIndexRequest);
Engine.Index firstIndexRequest = new Engine.Index(newUid(Integer.toString(i)), doc, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime());
engine.index(firstIndexRequest);
assertThat(firstIndexRequest.version(), equalTo(1l));
}
engine.refresh("test");
@ -1660,8 +1645,8 @@ public class InternalEngineTests extends ESTestCase {
final int numDocs = randomIntBetween(1, 10);
for (int i = 0; i < numDocs; i++) {
ParsedDocument doc = testParsedDocument(Integer.toString(i), Integer.toString(i), "test", null, -1, -1, testDocument(), new BytesArray("{}"), null);
Engine.Create firstIndexRequest = new Engine.Create(newUid(Integer.toString(i)), doc, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime());
engine.create(firstIndexRequest);
Engine.Index firstIndexRequest = new Engine.Index(newUid(Integer.toString(i)), doc, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime());
engine.index(firstIndexRequest);
assertThat(firstIndexRequest.version(), equalTo(1l));
}
engine.refresh("test");
@ -1761,8 +1746,8 @@ public class InternalEngineTests extends ESTestCase {
final int numExtraDocs = randomIntBetween(1, 10);
for (int i = 0; i < numExtraDocs; i++) {
ParsedDocument doc = testParsedDocument("extra" + Integer.toString(i), "extra" + Integer.toString(i), "test", null, -1, -1, testDocument(), new BytesArray("{}"), null);
Engine.Create firstIndexRequest = new Engine.Create(newUid(Integer.toString(i)), doc, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime());
engine.create(firstIndexRequest);
Engine.Index firstIndexRequest = new Engine.Index(newUid(Integer.toString(i)), doc, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime());
engine.index(firstIndexRequest);
assertThat(firstIndexRequest.version(), equalTo(1l));
}
engine.refresh("test");
@ -1790,8 +1775,8 @@ public class InternalEngineTests extends ESTestCase {
final int numDocs = randomIntBetween(1, 10);
for (int i = 0; i < numDocs; i++) {
ParsedDocument doc = testParsedDocument(Integer.toString(i), Integer.toString(i), "test", null, -1, -1, testDocument(), new BytesArray("{}"), null);
Engine.Create firstIndexRequest = new Engine.Create(newUid(Integer.toString(i)), doc, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime());
engine.create(firstIndexRequest);
Engine.Index firstIndexRequest = new Engine.Index(newUid(Integer.toString(i)), doc, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime());
engine.index(firstIndexRequest);
assertThat(firstIndexRequest.version(), equalTo(1l));
}
engine.refresh("test");
@ -1839,8 +1824,8 @@ public class InternalEngineTests extends ESTestCase {
int randomId = randomIntBetween(numDocs + 1, numDocs + 10);
String uuidValue = "test#" + Integer.toString(randomId);
ParsedDocument doc = testParsedDocument(uuidValue, Integer.toString(randomId), "test", null, -1, -1, testDocument(), new BytesArray("{}"), null);
Engine.Create firstIndexRequest = new Engine.Create(newUid(uuidValue), doc, 1, VersionType.EXTERNAL, PRIMARY, System.nanoTime());
engine.create(firstIndexRequest);
Engine.Index firstIndexRequest = new Engine.Index(newUid(uuidValue), doc, 1, VersionType.EXTERNAL, PRIMARY, System.nanoTime());
engine.index(firstIndexRequest);
assertThat(firstIndexRequest.version(), equalTo(1l));
if (flush) {
engine.flush();
@ -1920,8 +1905,8 @@ public class InternalEngineTests extends ESTestCase {
final int numDocs = randomIntBetween(1, 10);
for (int i = 0; i < numDocs; i++) {
ParsedDocument doc = testParsedDocument(Integer.toString(i), Integer.toString(i), "test", null, -1, -1, testDocument(), new BytesArray("{}"), null);
Engine.Create firstIndexRequest = new Engine.Create(newUid(Integer.toString(i)), doc, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime());
engine.create(firstIndexRequest);
Engine.Index firstIndexRequest = new Engine.Index(newUid(Integer.toString(i)), doc, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime());
engine.index(firstIndexRequest);
assertThat(firstIndexRequest.version(), equalTo(1l));
}
engine.refresh("test");
@ -1939,7 +1924,7 @@ public class InternalEngineTests extends ESTestCase {
engine.close();
Translog translog = new Translog(new TranslogConfig(shardId, createTempDir(), Settings.EMPTY, Translog.Durabilty.REQUEST, BigArrays.NON_RECYCLING_INSTANCE, threadPool));
translog.add(new Translog.Create("test", "SomeBogusId", "{}".getBytes(Charset.forName("UTF-8"))));
translog.add(new Translog.Index("test", "SomeBogusId", "{}".getBytes(Charset.forName("UTF-8"))));
assertEquals(generation.translogFileGeneration, translog.currentFileGeneration());
translog.close();

View File

@ -236,7 +236,7 @@ public class ShadowEngineTests extends ESTestCase {
public void testCommitStats() {
// create a doc and refresh
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null);
primaryEngine.create(new Engine.Create(newUid("1"), doc));
primaryEngine.index(new Engine.Index(newUid("1"), doc));
CommitStats stats1 = replicaEngine.commitStats();
assertThat(stats1.getGeneration(), greaterThan(0l));
@ -271,10 +271,10 @@ public class ShadowEngineTests extends ESTestCase {
// create a doc and refresh
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null);
primaryEngine.create(new Engine.Create(newUid("1"), doc));
primaryEngine.index(new Engine.Index(newUid("1"), doc));
ParsedDocument doc2 = testParsedDocument("2", "2", "test", null, -1, -1, testDocumentWithTextField(), B_2, null);
primaryEngine.create(new Engine.Create(newUid("2"), doc2));
primaryEngine.index(new Engine.Index(newUid("2"), doc2));
primaryEngine.refresh("test");
segments = primaryEngine.segments(false);
@ -334,7 +334,7 @@ public class ShadowEngineTests extends ESTestCase {
primaryEngine.onSettingsChanged();
ParsedDocument doc3 = testParsedDocument("3", "3", "test", null, -1, -1, testDocumentWithTextField(), B_3, null);
primaryEngine.create(new Engine.Create(newUid("3"), doc3));
primaryEngine.index(new Engine.Index(newUid("3"), doc3));
primaryEngine.refresh("test");
segments = primaryEngine.segments(false);
@ -407,7 +407,7 @@ public class ShadowEngineTests extends ESTestCase {
primaryEngine.onSettingsChanged();
ParsedDocument doc4 = testParsedDocument("4", "4", "test", null, -1, -1, testDocumentWithTextField(), B_3, null);
primaryEngine.create(new Engine.Create(newUid("4"), doc4));
primaryEngine.index(new Engine.Index(newUid("4"), doc4));
primaryEngine.refresh("test");
segments = primaryEngine.segments(false);
@ -441,7 +441,7 @@ public class ShadowEngineTests extends ESTestCase {
assertThat(segments.isEmpty(), equalTo(true));
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null);
primaryEngine.create(new Engine.Create(newUid("1"), doc));
primaryEngine.index(new Engine.Index(newUid("1"), doc));
primaryEngine.refresh("test");
segments = primaryEngine.segments(true);
@ -449,10 +449,10 @@ public class ShadowEngineTests extends ESTestCase {
assertThat(segments.get(0).ramTree, notNullValue());
ParsedDocument doc2 = testParsedDocument("2", "2", "test", null, -1, -1, testDocumentWithTextField(), B_2, null);
primaryEngine.create(new Engine.Create(newUid("2"), doc2));
primaryEngine.index(new Engine.Index(newUid("2"), doc2));
primaryEngine.refresh("test");
ParsedDocument doc3 = testParsedDocument("3", "3", "test", null, -1, -1, testDocumentWithTextField(), B_3, null);
primaryEngine.create(new Engine.Create(newUid("3"), doc3));
primaryEngine.index(new Engine.Index(newUid("3"), doc3));
primaryEngine.refresh("test");
segments = primaryEngine.segments(true);
@ -480,7 +480,7 @@ public class ShadowEngineTests extends ESTestCase {
document.add(new Field(SourceFieldMapper.NAME, B_1.toBytes(), SourceFieldMapper.Defaults.FIELD_TYPE));
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, document, B_1, null);
try {
replicaEngine.create(new Engine.Create(newUid("1"), doc));
replicaEngine.index(new Engine.Index(newUid("1"), doc));
fail("should have thrown an exception");
} catch (UnsupportedOperationException e) {}
replicaEngine.refresh("test");
@ -517,7 +517,7 @@ public class ShadowEngineTests extends ESTestCase {
document = testDocumentWithTextField();
document.add(new Field(SourceFieldMapper.NAME, B_1.toBytes(), SourceFieldMapper.Defaults.FIELD_TYPE));
doc = testParsedDocument("1", "1", "test", null, -1, -1, document, B_1, null);
primaryEngine.create(new Engine.Create(newUid("1"), doc));
primaryEngine.index(new Engine.Index(newUid("1"), doc));
primaryEngine.flush();
replicaEngine.refresh("test");
@ -573,7 +573,7 @@ public class ShadowEngineTests extends ESTestCase {
ParseContext.Document document = testDocumentWithTextField();
document.add(new Field(SourceFieldMapper.NAME, B_1.toBytes(), SourceFieldMapper.Defaults.FIELD_TYPE));
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, document, B_1, null);
primaryEngine.create(new Engine.Create(newUid("1"), doc));
primaryEngine.index(new Engine.Index(newUid("1"), doc));
// its not there...
searchResult = primaryEngine.acquireSearcher("test");
@ -700,7 +700,7 @@ public class ShadowEngineTests extends ESTestCase {
document = testDocumentWithTextField();
document.add(new Field(SourceFieldMapper.NAME, B_1.toBytes(), SourceFieldMapper.Defaults.FIELD_TYPE));
doc = testParsedDocument("1", "1", "test", null, -1, -1, document, B_1, null);
primaryEngine.create(new Engine.Create(newUid("1"), doc));
primaryEngine.index(new Engine.Index(newUid("1"), doc));
// its not there...
searchResult = primaryEngine.acquireSearcher("test");
@ -784,7 +784,7 @@ public class ShadowEngineTests extends ESTestCase {
// create a document
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null);
primaryEngine.create(new Engine.Create(newUid("1"), doc));
primaryEngine.index(new Engine.Index(newUid("1"), doc));
// its not there...
searchResult = primaryEngine.acquireSearcher("test");
@ -830,7 +830,7 @@ public class ShadowEngineTests extends ESTestCase {
@Test
public void testFailEngineOnCorruption() {
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null);
primaryEngine.create(new Engine.Create(newUid("1"), doc));
primaryEngine.index(new Engine.Index(newUid("1"), doc));
primaryEngine.flush();
MockDirectoryWrapper leaf = DirectoryUtils.getLeaf(replicaEngine.config().getStore().directory(), MockDirectoryWrapper.class);
leaf.setRandomIOExceptionRate(1.0);
@ -869,7 +869,7 @@ public class ShadowEngineTests extends ESTestCase {
public void testFailStart() throws IOException {
// Need a commit point for this
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null);
primaryEngine.create(new Engine.Create(newUid("1"), doc));
primaryEngine.index(new Engine.Index(newUid("1"), doc));
primaryEngine.flush();
// this test fails if any reader, searcher or directory is not closed - MDW FTW
@ -957,7 +957,7 @@ public class ShadowEngineTests extends ESTestCase {
ParseContext.Document document = testDocumentWithTextField();
document.add(new Field(SourceFieldMapper.NAME, B_1.toBytes(), SourceFieldMapper.Defaults.FIELD_TYPE));
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, document, B_1, null);
pEngine.create(new Engine.Create(newUid("1"), doc));
pEngine.index(new Engine.Index(newUid("1"), doc));
pEngine.flush(true, true);
t.join();

View File

@ -41,6 +41,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.SnapshotId;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.common.ParsingException;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
@ -69,7 +70,6 @@ import org.elasticsearch.index.mapper.ParseContext;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
import org.elasticsearch.common.ParsingException;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
@ -95,10 +95,7 @@ import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.elasticsearch.cluster.metadata.IndexMetaData.EMPTY_PARAMS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_VERSION_CREATED;
import static org.elasticsearch.cluster.metadata.IndexMetaData.*;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.*;
@ -628,9 +625,9 @@ public class IndexShardTests extends ESSingleNodeTestCase {
shardIndexingService.addListener(new IndexingOperationListener() {
@Override
public Engine.Index preIndex(Engine.Index index) {
public Engine.Index preIndex(Engine.Index operation) {
preIndexCalled.set(true);
return super.preIndex(index);
return super.preIndex(operation);
}
});

View File

@ -183,14 +183,14 @@ public class TranslogTests extends ESTestCase {
@Test
public void testRead() throws IOException {
Translog.Location loc1 = translog.add(new Translog.Create("test", "1", new byte[]{1}));
Translog.Location loc2 = translog.add(new Translog.Create("test", "2", new byte[]{2}));
Translog.Location loc1 = translog.add(new Translog.Index("test", "1", new byte[]{1}));
Translog.Location loc2 = translog.add(new Translog.Index("test", "2", new byte[]{2}));
assertThat(translog.read(loc1).getSource().source.toBytesArray(), equalTo(new BytesArray(new byte[]{1})));
assertThat(translog.read(loc2).getSource().source.toBytesArray(), equalTo(new BytesArray(new byte[]{2})));
translog.sync();
assertThat(translog.read(loc1).getSource().source.toBytesArray(), equalTo(new BytesArray(new byte[]{1})));
assertThat(translog.read(loc2).getSource().source.toBytesArray(), equalTo(new BytesArray(new byte[]{2})));
Translog.Location loc3 = translog.add(new Translog.Create("test", "2", new byte[]{3}));
Translog.Location loc3 = translog.add(new Translog.Index("test", "2", new byte[]{3}));
assertThat(translog.read(loc3).getSource().source.toBytesArray(), equalTo(new BytesArray(new byte[]{3})));
translog.sync();
assertThat(translog.read(loc3).getSource().source.toBytesArray(), equalTo(new BytesArray(new byte[]{3})));
@ -215,19 +215,13 @@ public class TranslogTests extends ESTestCase {
assertThat(snapshot, SnapshotMatchers.size(0));
snapshot.close();
addToTranslogAndList(translog, ops, new Translog.Create("test", "1", new byte[]{1}));
snapshot = translog.newSnapshot();
assertThat(snapshot, SnapshotMatchers.equalsTo(ops));
assertThat(snapshot.estimatedTotalOperations(), equalTo(1));
snapshot.close();
addToTranslogAndList(translog, ops, new Translog.Index("test", "2", new byte[]{2}));
addToTranslogAndList(translog, ops, new Translog.Index("test", "1", new byte[]{1}));
snapshot = translog.newSnapshot();
assertThat(snapshot, SnapshotMatchers.equalsTo(ops));
assertThat(snapshot.estimatedTotalOperations(), equalTo(ops.size()));
snapshot.close();
addToTranslogAndList(translog, ops, new Translog.Delete(newUid("3")));
addToTranslogAndList(translog, ops, new Translog.Delete(newUid("2")));
snapshot = translog.newSnapshot();
assertThat(snapshot, SnapshotMatchers.equalsTo(ops));
assertThat(snapshot.estimatedTotalOperations(), equalTo(ops.size()));
@ -235,17 +229,13 @@ public class TranslogTests extends ESTestCase {
snapshot = translog.newSnapshot();
Translog.Create create = (Translog.Create) snapshot.next();
assertThat(create != null, equalTo(true));
assertThat(create.source().toBytes(), equalTo(new byte[]{1}));
Translog.Index index = (Translog.Index) snapshot.next();
assertThat(index != null, equalTo(true));
assertThat(index.source().toBytes(), equalTo(new byte[]{2}));
assertThat(index.source().toBytes(), equalTo(new byte[]{1}));
Translog.Delete delete = (Translog.Delete) snapshot.next();
assertThat(delete != null, equalTo(true));
assertThat(delete.uid(), equalTo(newUid("3")));
assertThat(delete.uid(), equalTo(newUid("2")));
assertThat(snapshot.next(), equalTo(null));
@ -290,28 +280,22 @@ public class TranslogTests extends ESTestCase {
assertThat((int) firstOperationPosition, greaterThan(CodecUtil.headerLength(TranslogWriter.TRANSLOG_CODEC)));
assertThat(lastSize, equalTo(firstOperationPosition));
translog.add(new Translog.Create("test", "1", new byte[]{1}));
translog.add(new Translog.Index("test", "1", new byte[]{1}));
stats = stats();
assertThat(stats.estimatedNumberOfOperations(), equalTo(1l));
assertThat(stats.translogSizeInBytes().bytes(), greaterThan(lastSize));
lastSize = stats.translogSizeInBytes().bytes();
translog.add(new Translog.Index("test", "2", new byte[]{2}));
translog.add(new Translog.Delete(newUid("2")));
stats = stats();
assertThat(stats.estimatedNumberOfOperations(), equalTo(2l));
assertThat(stats.translogSizeInBytes().bytes(), greaterThan(lastSize));
lastSize = stats.translogSizeInBytes().bytes();
translog.add(new Translog.Delete(newUid("3")));
stats = stats();
assertThat(stats.estimatedNumberOfOperations(), equalTo(3l));
assertThat(stats.translogSizeInBytes().bytes(), greaterThan(lastSize));
lastSize = stats.translogSizeInBytes().bytes();
translog.add(new Translog.Delete(newUid("4")));
translog.prepareCommit();
stats = stats();
assertThat(stats.estimatedNumberOfOperations(), equalTo(4l));
assertThat(stats.estimatedNumberOfOperations(), equalTo(3l));
assertThat(stats.translogSizeInBytes().bytes(), greaterThan(lastSize));
translog.commit();
@ -327,7 +311,7 @@ public class TranslogTests extends ESTestCase {
assertThat(snapshot, SnapshotMatchers.size(0));
snapshot.close();
addToTranslogAndList(translog, ops, new Translog.Create("test", "1", new byte[]{1}));
addToTranslogAndList(translog, ops, new Translog.Index("test", "1", new byte[]{1}));
snapshot = translog.newSnapshot();
assertThat(snapshot, SnapshotMatchers.equalsTo(ops));
@ -354,7 +338,7 @@ public class TranslogTests extends ESTestCase {
assertThat(snapshot, SnapshotMatchers.size(0));
snapshot.close();
addToTranslogAndList(translog, ops, new Translog.Create("test", "1", new byte[]{1}));
addToTranslogAndList(translog, ops, new Translog.Index("test", "1", new byte[]{1}));
Translog.Snapshot snapshot1 = translog.newSnapshot();
addToTranslogAndList(translog, ops, new Translog.Index("test", "2", new byte[]{2}));
@ -375,7 +359,7 @@ public class TranslogTests extends ESTestCase {
public void testSnapshotOnClosedTranslog() throws IOException {
assertTrue(Files.exists(translogDir.resolve(Translog.getFilename(1))));
translog.add(new Translog.Create("test", "1", new byte[]{1}));
translog.add(new Translog.Index("test", "1", new byte[]{1}));
translog.close();
try {
Translog.Snapshot snapshot = translog.newSnapshot();
@ -388,7 +372,7 @@ public class TranslogTests extends ESTestCase {
@Test
public void deleteOnSnapshotRelease() throws Exception {
ArrayList<Translog.Operation> firstOps = new ArrayList<>();
addToTranslogAndList(translog, firstOps, new Translog.Create("test", "1", new byte[]{1}));
addToTranslogAndList(translog, firstOps, new Translog.Index("test", "1", new byte[]{1}));
Translog.Snapshot firstSnapshot = translog.newSnapshot();
assertThat(firstSnapshot.estimatedTotalOperations(), equalTo(1));
@ -463,10 +447,7 @@ public class TranslogTests extends ESTestCase {
Translog.Operation op;
switch (randomFrom(Translog.Operation.Type.values())) {
case CREATE:
op = new Translog.Create("test", threadId + "_" + opCount,
randomUnicodeOfLengthBetween(1, 20 * 1024).getBytes("UTF-8"));
break;
case SAVE:
case INDEX:
op = new Translog.Index("test", threadId + "_" + opCount,
randomUnicodeOfLengthBetween(1, 20 * 1024).getBytes("UTF-8"));
break;
@ -508,7 +489,7 @@ public class TranslogTests extends ESTestCase {
Translog.Operation expectedOp = locationOperation.operation;
assertEquals(expectedOp.opType(), op.opType());
switch (op.opType()) {
case SAVE:
case INDEX:
Translog.Index indexOp = (Translog.Index) op;
Translog.Index expIndexOp = (Translog.Index) expectedOp;
assertEquals(expIndexOp.id(), indexOp.id());
@ -518,16 +499,6 @@ public class TranslogTests extends ESTestCase {
assertEquals(expIndexOp.version(), indexOp.version());
assertEquals(expIndexOp.versionType(), indexOp.versionType());
break;
case CREATE:
Translog.Create createOp = (Translog.Create) op;
Translog.Create expCreateOp = (Translog.Create) expectedOp;
assertEquals(expCreateOp.id(), createOp.id());
assertEquals(expCreateOp.routing(), createOp.routing());
assertEquals(expCreateOp.type(), createOp.type());
assertEquals(expCreateOp.source(), createOp.source());
assertEquals(expCreateOp.version(), createOp.version());
assertEquals(expCreateOp.versionType(), createOp.versionType());
break;
case DELETE:
Translog.Delete delOp = (Translog.Delete) op;
Translog.Delete expDelOp = (Translog.Delete) expectedOp;
@ -550,7 +521,7 @@ public class TranslogTests extends ESTestCase {
int translogOperations = randomIntBetween(10, 100);
for (int op = 0; op < translogOperations; op++) {
String ascii = randomAsciiOfLengthBetween(1, 50);
locations.add(translog.add(new Translog.Create("test", "" + op, ascii.getBytes("UTF-8"))));
locations.add(translog.add(new Translog.Index("test", "" + op, ascii.getBytes("UTF-8"))));
}
translog.sync();
@ -574,7 +545,7 @@ public class TranslogTests extends ESTestCase {
int translogOperations = randomIntBetween(10, 100);
for (int op = 0; op < translogOperations; op++) {
String ascii = randomAsciiOfLengthBetween(1, 50);
locations.add(translog.add(new Translog.Create("test", "" + op, ascii.getBytes("UTF-8"))));
locations.add(translog.add(new Translog.Index("test", "" + op, ascii.getBytes("UTF-8"))));
}
translog.sync();
@ -638,7 +609,7 @@ public class TranslogTests extends ESTestCase {
@Test
public void testVerifyTranslogIsNotDeleted() throws IOException {
assertFileIsPresent(translog, 1);
translog.add(new Translog.Create("test", "1", new byte[]{1}));
translog.add(new Translog.Index("test", "1", new byte[]{1}));
Translog.Snapshot snapshot = translog.newSnapshot();
assertThat(snapshot, SnapshotMatchers.size(1));
assertFileIsPresent(translog, 1);
@ -686,9 +657,7 @@ public class TranslogTests extends ESTestCase {
final Translog.Operation op;
switch (Translog.Operation.Type.values()[((int) (id % Translog.Operation.Type.values().length))]) {
case CREATE:
op = new Translog.Create("type", "" + id, new byte[]{(byte) id});
break;
case SAVE:
case INDEX:
op = new Translog.Index("type", "" + id, new byte[]{(byte) id});
break;
case DELETE:
@ -830,12 +799,12 @@ public class TranslogTests extends ESTestCase {
int translogOperations = randomIntBetween(10, 100);
int count = 0;
for (int op = 0; op < translogOperations; op++) {
final Translog.Location location = translog.add(new Translog.Create("test", "" + op, Integer.toString(++count).getBytes(Charset.forName("UTF-8"))));
final Translog.Location location = translog.add(new Translog.Index("test", "" + op, Integer.toString(++count).getBytes(Charset.forName("UTF-8"))));
if (randomBoolean()) {
assertTrue("at least one operation pending", translog.syncNeeded());
assertTrue("this operation has not been synced", translog.ensureSynced(location));
assertFalse("the last call to ensureSycned synced all previous ops", translog.syncNeeded()); // we are the last location so everything should be synced
translog.add(new Translog.Create("test", "" + op, Integer.toString(++count).getBytes(Charset.forName("UTF-8"))));
translog.add(new Translog.Index("test", "" + op, Integer.toString(++count).getBytes(Charset.forName("UTF-8"))));
assertTrue("one pending operation", translog.syncNeeded());
assertFalse("this op has been synced before", translog.ensureSynced(location)); // not syncing now
assertTrue("we only synced a previous operation yet", translog.syncNeeded());
@ -858,7 +827,7 @@ public class TranslogTests extends ESTestCase {
int translogOperations = randomIntBetween(10, 100);
int count = 0;
for (int op = 0; op < translogOperations; op++) {
locations.add(translog.add(new Translog.Create("test", "" + op, Integer.toString(++count).getBytes(Charset.forName("UTF-8")))));
locations.add(translog.add(new Translog.Index("test", "" + op, Integer.toString(++count).getBytes(Charset.forName("UTF-8")))));
if (rarely() && translogOperations > op+1) {
translog.commit();
}
@ -887,14 +856,14 @@ public class TranslogTests extends ESTestCase {
int translogOperations = randomIntBetween(10, 100);
int lastSynced = -1;
for (int op = 0; op < translogOperations; op++) {
locations.add(translog.add(new Translog.Create("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
locations.add(translog.add(new Translog.Index("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
if (frequently()) {
translog.sync();
lastSynced = op;
}
}
assertEquals(translogOperations, translog.totalOperations());
final Translog.Location lastLocation = translog.add(new Translog.Create("test", "" + translogOperations, Integer.toString(translogOperations).getBytes(Charset.forName("UTF-8"))));
final Translog.Location lastLocation = translog.add(new Translog.Index("test", "" + translogOperations, Integer.toString(translogOperations).getBytes(Charset.forName("UTF-8"))));
final Checkpoint checkpoint = Checkpoint.read(translog.location().resolve(Translog.CHECKPOINT_FILE_NAME));
try (final ImmutableTranslogReader reader = translog.openReader(translog.location().resolve(Translog.getFilename(translog.currentFileGeneration())), checkpoint)) {
@ -975,7 +944,7 @@ public class TranslogTests extends ESTestCase {
int minUncommittedOp = -1;
final boolean commitOften = randomBoolean();
for (int op = 0; op < translogOperations; op++) {
locations.add(translog.add(new Translog.Create("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
locations.add(translog.add(new Translog.Index("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
final boolean commit = commitOften ? frequently() : rarely();
if (commit && op < translogOperations-1) {
translog.commit();
@ -1017,7 +986,7 @@ public class TranslogTests extends ESTestCase {
Translog.TranslogGeneration translogGeneration = null;
final boolean sync = randomBoolean();
for (int op = 0; op < translogOperations; op++) {
locations.add(translog.add(new Translog.Create("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
locations.add(translog.add(new Translog.Index("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
if (op == prepareOp) {
translogGeneration = translog.getGeneration();
translog.prepareCommit();
@ -1068,7 +1037,7 @@ public class TranslogTests extends ESTestCase {
List<Translog.Operation> ops = new ArrayList<>();
int translogOperations = randomIntBetween(10, 100);
for (int op = 0; op < translogOperations; op++) {
Translog.Create test = new Translog.Create("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8")));
Translog.Index test = new Translog.Index("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8")));
ops.add(test);
}
Translog.writeOperations(out, ops);
@ -1083,8 +1052,8 @@ public class TranslogTests extends ESTestCase {
int translogOperations = randomIntBetween(10, 100);
try(Translog translog2 = create(createTempDir())) {
for (int op = 0; op < translogOperations; op++) {
locations.add(translog.add(new Translog.Create("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
locations2.add(translog2.add(new Translog.Create("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
locations.add(translog.add(new Translog.Index("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
locations2.add(translog2.add(new Translog.Index("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
}
int iters = randomIntBetween(10, 100);
for (int i = 0; i < iters; i++) {
@ -1110,7 +1079,7 @@ public class TranslogTests extends ESTestCase {
int translogOperations = randomIntBetween(1, 10);
int firstUncommitted = 0;
for (int op = 0; op < translogOperations; op++) {
locations.add(translog.add(new Translog.Create("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
locations.add(translog.add(new Translog.Index("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
if (randomBoolean()) {
translog.commit();
firstUncommitted = op + 1;

View File

@ -45,7 +45,7 @@ public class TranslogVersionTests extends ESTestCase {
assertThat("a version0 stream is returned", reader instanceof LegacyTranslogReader, equalTo(true));
try (final Translog.Snapshot snapshot = reader.newSnapshot()) {
final Translog.Operation operation = snapshot.next();
assertThat("operation is the correct type correctly", operation.opType() == Translog.Operation.Type.SAVE, equalTo(true));
assertThat("operation is the correct type correctly", operation.opType() == Translog.Operation.Type.INDEX, equalTo(true));
Translog.Index op = (Translog.Index) operation;
assertThat(op.id(), equalTo("1"));
assertThat(op.type(), equalTo("doc"));
@ -73,8 +73,8 @@ public class TranslogVersionTests extends ESTestCase {
Translog.Operation operation = snapshot.next();
assertThat("operation is the correct type correctly", operation.opType() == Translog.Operation.Type.CREATE, equalTo(true));
Translog.Create op = (Translog.Create) operation;
assertThat("operation is the correct type correctly", operation.opType() == Translog.Operation.Type.INDEX, equalTo(true));
Translog.Index op = (Translog.Index) operation;
assertThat(op.id(), equalTo("Bwiq98KFSb6YjJQGeSpeiw"));
assertThat(op.type(), equalTo("doc"));
assertThat(op.source().toUtf8(), equalTo("{\"body\": \"foo\"}"));

View File

@ -18,15 +18,6 @@
*/
package org.elasticsearch.versioning;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Locale;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.util.TestUtil;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.bulk.BulkResponse;
@ -37,12 +28,15 @@ import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.DocumentAlreadyExistsException;
import org.elasticsearch.index.engine.FlushNotAllowedEngineException;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.test.ESIntegTestCase;
import org.junit.Test;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows;
@ -100,7 +94,7 @@ public class SimpleVersioningIT extends ESIntegTestCase {
}
// deleting with a lower version works.
long v= randomIntBetween(12,14);
long v = randomIntBetween(12, 14);
DeleteResponse deleteResponse = client().prepareDelete("test", "type", "1").setVersion(v).setVersionType(VersionType.FORCE).get();
assertThat(deleteResponse.isFound(), equalTo(true));
assertThat(deleteResponse.getVersion(), equalTo(v));
@ -136,7 +130,7 @@ public class SimpleVersioningIT extends ESIntegTestCase {
VersionConflictEngineException.class);
// Delete with a higher or equal version deletes all versions up to the given one.
long v= randomIntBetween(14,17);
long v = randomIntBetween(14, 17);
DeleteResponse deleteResponse = client().prepareDelete("test", "type", "1").setVersion(v).setVersionType(VersionType.EXTERNAL_GTE).execute().actionGet();
assertThat(deleteResponse.isFound(), equalTo(true));
assertThat(deleteResponse.getVersion(), equalTo(v));
@ -206,8 +200,8 @@ public class SimpleVersioningIT extends ESIntegTestCase {
assertThat(deleteResponse.getVersion(), equalTo(20l));
// Make sure that the next delete will be GC. Note we do it on the index settings so it will be cleaned up
HashMap<String,Object> newSettings = new HashMap<>();
newSettings.put("index.gc_deletes",-1);
HashMap<String, Object> newSettings = new HashMap<>();
newSettings.put("index.gc_deletes", -1);
client().admin().indices().prepareUpdateSettings("test").setSettings(newSettings).execute().actionGet();
Thread.sleep(300); // gc works based on estimated sampled time. Give it a chance...
@ -221,7 +215,7 @@ public class SimpleVersioningIT extends ESIntegTestCase {
public void testRequireUnitsOnUpdateSettings() throws Exception {
createIndex("test");
ensureGreen();
HashMap<String,Object> newSettings = new HashMap<>();
HashMap<String, Object> newSettings = new HashMap<>();
newSettings.put("index.gc_deletes", "42");
try {
client().admin().indices().prepareUpdateSettings("test").setSettings(newSettings).execute().actionGet();
@ -266,18 +260,8 @@ public class SimpleVersioningIT extends ESIntegTestCase {
VersionConflictEngineException.class);
assertThrows(
client().prepareIndex("test", "type", "1").setCreate(true).setSource("field1", "value1_1").setVersion(1).execute(),
client().prepareIndex("test", "type", "1").setCreate(true).setSource("field1", "value1_1").execute(),
VersionConflictEngineException.class);
assertThrows(
client().prepareIndex("test", "type", "1").setCreate(true).setSource("field1", "value1_1").setVersion(1).execute(),
VersionConflictEngineException.class);
assertThrows(
client().prepareIndex("test", "type", "1").setCreate(true).setSource("field1", "value1_1").setVersion(2).execute(),
DocumentAlreadyExistsException.class);
assertThrows(
client().prepareIndex("test", "type", "1").setCreate(true).setSource("field1", "value1_1").setVersion(2).execute(),
DocumentAlreadyExistsException.class);
assertThrows(client().prepareDelete("test", "type", "1").setVersion(1).execute(), VersionConflictEngineException.class);
@ -334,10 +318,8 @@ public class SimpleVersioningIT extends ESIntegTestCase {
assertThrows(client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setVersion(1).execute(),
VersionConflictEngineException.class);
assertThrows(client().prepareIndex("test", "type", "1").setCreate(true).setSource("field1", "value1_1").setVersion(1).execute(),
VersionConflictEngineException.class);
assertThrows(client().prepareIndex("test", "type", "1").setCreate(true).setSource("field1", "value1_1").setVersion(1).execute(),
assertThrows(client().prepareIndex("test", "type", "1").setCreate(true).setSource("field1", "value1_1").execute(),
VersionConflictEngineException.class);
assertThrows(client().prepareDelete("test", "type", "1").setVersion(1).execute(), VersionConflictEngineException.class);
@ -408,6 +390,7 @@ public class SimpleVersioningIT extends ESIntegTestCase {
}
ids = new IDSource() {
int upto;
@Override
public String next() {
return Integer.toString(upto++);
@ -423,6 +406,7 @@ public class SimpleVersioningIT extends ESIntegTestCase {
final int radix = TestUtil.nextInt(random, Character.MIN_RADIX, Character.MAX_RADIX);
final String zeroPad = String.format(Locale.ROOT, "%0" + TestUtil.nextInt(random, 4, 20) + "d", 0);
int upto;
@Override
public String next() {
String s = Integer.toString(upto++);
@ -438,6 +422,7 @@ public class SimpleVersioningIT extends ESIntegTestCase {
ids = new IDSource() {
final int radix = TestUtil.nextInt(random, Character.MIN_RADIX, Character.MAX_RADIX);
int upto;
@Override
public String next() {
return Long.toString(random.nextLong() & 0x3ffffffffffffffL, radix);
@ -453,6 +438,7 @@ public class SimpleVersioningIT extends ESIntegTestCase {
final int radix = TestUtil.nextInt(random, Character.MIN_RADIX, Character.MAX_RADIX);
final String zeroPad = String.format(Locale.ROOT, "%015d", 0);
int upto;
@Override
public String next() {
return Long.toString(random.nextLong() & 0x3ffffffffffffffL, radix);
@ -547,7 +533,7 @@ public class SimpleVersioningIT extends ESIntegTestCase {
// TODO: not great we don't test deletes GC here:
// We test deletes, but can't rely on wall-clock delete GC:
HashMap<String,Object> newSettings = new HashMap<>();
HashMap<String, Object> newSettings = new HashMap<>();
newSettings.put("index.gc_deletes", "1000000h");
assertAcked(client().admin().indices().prepareUpdateSettings("test").setSettings(newSettings).execute().actionGet());
@ -584,14 +570,14 @@ public class SimpleVersioningIT extends ESIntegTestCase {
// Attach random versions to them:
long version = 0;
final IDAndVersion[] idVersions = new IDAndVersion[TestUtil.nextInt(random, numIDs/2, numIDs*(TEST_NIGHTLY ? 8 : 2))];
final Map<String,IDAndVersion> truth = new HashMap<>();
final IDAndVersion[] idVersions = new IDAndVersion[TestUtil.nextInt(random, numIDs / 2, numIDs * (TEST_NIGHTLY ? 8 : 2))];
final Map<String, IDAndVersion> truth = new HashMap<>();
if (VERBOSE) {
System.out.println("TEST: use " + numIDs + " ids; " + idVersions.length + " operations");
}
for(int i=0;i<idVersions.length;i++) {
for (int i = 0; i < idVersions.length; i++) {
if (useMonotonicVersion) {
version += TestUtil.nextInt(random, 1, 10);
@ -612,7 +598,7 @@ public class SimpleVersioningIT extends ESIntegTestCase {
}
// Shuffle
for(int i = idVersions.length - 1; i > 0; i--) {
for (int i = idVersions.length - 1; i > 0; i--) {
int index = random.nextInt(i + 1);
IDAndVersion x = idVersions[index];
idVersions[index] = idVersions[i];
@ -620,7 +606,7 @@ public class SimpleVersioningIT extends ESIntegTestCase {
}
if (VERBOSE) {
for(IDAndVersion idVersion : idVersions) {
for (IDAndVersion idVersion : idVersions) {
System.out.println("id=" + idVersion.id + " version=" + idVersion.version + " delete?=" + idVersion.delete + " truth?=" + (truth.get(idVersion.id) == idVersion));
}
}
@ -629,7 +615,7 @@ public class SimpleVersioningIT extends ESIntegTestCase {
final CountDownLatch startingGun = new CountDownLatch(1);
Thread[] threads = new Thread[TestUtil.nextInt(random, 1, TEST_NIGHTLY ? 20 : 5)];
final long startTime = System.nanoTime();
for(int i=0;i<threads.length;i++) {
for (int i = 0; i < threads.length; i++) {
final int threadID = i;
threads[i] = new Thread() {
@Override
@ -653,7 +639,7 @@ public class SimpleVersioningIT extends ESIntegTestCase {
String id = idVersion.id;
idVersion.threadID = threadID;
idVersion.indexStartTime = System.nanoTime()-startTime;
idVersion.indexStartTime = System.nanoTime() - startTime;
long version = idVersion.version;
if (idVersion.delete) {
try {
@ -666,54 +652,32 @@ public class SimpleVersioningIT extends ESIntegTestCase {
idVersion.versionConflict = true;
}
} else {
for (int x=0;x<2;x++) {
// Try create first:
IndexRequest.OpType op;
if (x == 0) {
op = IndexRequest.OpType.CREATE;
} else {
op = IndexRequest.OpType.INDEX;
}
// index document
try {
idVersion.response = client().prepareIndex("test", "type", id)
.setSource("foo", "bar")
.setOpType(op)
.setVersion(version)
.setVersionType(VersionType.EXTERNAL).execute().actionGet();
break;
} catch (DocumentAlreadyExistsException daee) {
if (x == 0) {
// OK: id was already indexed by another thread, now use index:
idVersion.alreadyExists = true;
} else {
// Should not happen with op=INDEX:
throw daee;
}
.setVersion(version).setVersionType(VersionType.EXTERNAL).get();
} catch (VersionConflictEngineException vcee) {
// OK: our version is too old
assertThat(version, lessThanOrEqualTo(truth.get(id).version));
idVersion.versionConflict = true;
}
}
}
idVersion.indexFinishTime = System.nanoTime()-startTime;
idVersion.indexFinishTime = System.nanoTime() - startTime;
if (threadRandom.nextInt(100) == 7) {
System.out.println(threadID + ": TEST: now refresh at " + (System.nanoTime()-startTime));
System.out.println(threadID + ": TEST: now refresh at " + (System.nanoTime() - startTime));
refresh();
System.out.println(threadID + ": TEST: refresh done at " + (System.nanoTime()-startTime));
System.out.println(threadID + ": TEST: refresh done at " + (System.nanoTime() - startTime));
}
if (threadRandom.nextInt(100) == 7) {
System.out.println(threadID + ": TEST: now flush at " + (System.nanoTime()-startTime));
System.out.println(threadID + ": TEST: now flush at " + (System.nanoTime() - startTime));
try {
flush();
} catch (FlushNotAllowedEngineException fnaee) {
// OK
}
System.out.println(threadID + ": TEST: flush done at " + (System.nanoTime()-startTime));
System.out.println(threadID + ": TEST: flush done at " + (System.nanoTime() - startTime));
}
}
} catch (Exception e) {
@ -725,13 +689,13 @@ public class SimpleVersioningIT extends ESIntegTestCase {
}
startingGun.countDown();
for(Thread thread : threads) {
for (Thread thread : threads) {
thread.join();
}
// Verify against truth:
boolean failed = false;
for(String id : ids) {
for (String id : ids) {
long expected;
IDAndVersion idVersion = truth.get(id);
if (idVersion != null && idVersion.delete == false) {
@ -748,7 +712,7 @@ public class SimpleVersioningIT extends ESIntegTestCase {
if (failed) {
System.out.println("All versions:");
for(int i=0;i<idVersions.length;i++) {
for (int i = 0; i < idVersions.length; i++) {
System.out.println("i=" + i + " " + idVersions[i]);
}
fail("wrong versions for some IDs");
@ -770,7 +734,7 @@ public class SimpleVersioningIT extends ESIntegTestCase {
ensureGreen();
HashMap<String,Object> newSettings = new HashMap<>();
HashMap<String, Object> newSettings = new HashMap<>();
newSettings.put("index.gc_deletes", "10ms");
newSettings.put("index.refresh_interval", "-1");
client()
@ -842,7 +806,7 @@ public class SimpleVersioningIT extends ESIntegTestCase {
ensureGreen();
// We test deletes, but can't rely on wall-clock delete GC:
HashMap<String,Object> newSettings = new HashMap<>();
HashMap<String, Object> newSettings = new HashMap<>();
newSettings.put("index.gc_deletes", "0ms");
client()
.admin()

View File

@ -266,3 +266,8 @@ of string values: see `FilterFunctionScoreQuery.ScoreMode` and `CombineFunction`
For simplicity, only one way of adding the ids to the existing list (empty by default) is left: `addIds(String...)`
==== DocumentAlreadyExistsException removed
`DocumentAlreadyExistsException` is removed and a `VersionConflictException` is thrown instead (with a better
error description). This will influence code that use the `IndexRequest.opType()` or `IndexRequest.create()`
to index a document only if it doesn't already exist.

View File

@ -19,8 +19,6 @@
package org.elasticsearch.messy.tests;
import java.nio.charset.StandardCharsets;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.bulk.BulkItemResponse;
@ -49,21 +47,15 @@ import org.elasticsearch.script.groovy.GroovyPlugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.junit.Test;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.CyclicBarrier;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertExists;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchHits;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.*;
import static org.hamcrest.Matchers.*;
public class BulkTests extends ESIntegTestCase {
@ -190,8 +182,8 @@ public class BulkTests extends ESIntegTestCase {
assertThat(((UpdateResponse) bulkResponse.getItems()[2].getResponse()).getVersion(), equalTo(3l));
bulkResponse = client().prepareBulk()
.add(client().prepareIndex("test", "type", "e1").setCreate(true).setSource("field", "1").setVersion(10).setVersionType(VersionType.EXTERNAL))
.add(client().prepareIndex("test", "type", "e2").setCreate(true).setSource("field", "1").setVersion(10).setVersionType(VersionType.EXTERNAL))
.add(client().prepareIndex("test", "type", "e1").setSource("field", "1").setVersion(10).setVersionType(VersionType.EXTERNAL))
.add(client().prepareIndex("test", "type", "e2").setSource("field", "1").setVersion(10).setVersionType(VersionType.EXTERNAL))
.add(client().prepareIndex("test", "type", "e1").setSource("field", "2").setVersion(12).setVersionType(VersionType.EXTERNAL)).get();
assertTrue(((IndexResponse) bulkResponse.getItems()[0].getResponse()).isCreated());

View File

@ -44,12 +44,7 @@ import org.elasticsearch.script.groovy.GroovyPlugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
@ -58,12 +53,7 @@ import java.util.concurrent.TimeUnit;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.*;
public class UpdateTests extends ESIntegTestCase {
@ -258,25 +248,26 @@ public class UpdateTests extends ESIntegTestCase {
.setVersionType(VersionType.EXTERNAL).execute(),
ActionRequestValidationException.class);
// With force version
client().prepareUpdate(indexOrAlias(), "type", "2")
.setScript(new Script("ctx._source.text = 'v10'", ScriptService.ScriptType.INLINE, null, null))
.setVersion(10).setVersionType(VersionType.FORCE).get();
GetResponse get = get("test", "type", "2");
assertThat(get.getVersion(), equalTo(10l));
assertThat((String) get.getSource().get("text"), equalTo("v10"));
// upserts - the combination with versions is a bit weird. Test are here to ensure we do not change our behavior unintentionally
// With internal versions, tt means "if object is there with version X, update it or explode. If it is not there, index.
client().prepareUpdate(indexOrAlias(), "type", "3")
.setScript(new Script("ctx._source.text = 'v2'", ScriptService.ScriptType.INLINE, null, null))
.setVersion(10).setUpsert("{ \"text\": \"v0\" }").get();
GetResponse get = get("test", "type", "3");
get = get("test", "type", "3");
assertThat(get.getVersion(), equalTo(1l));
assertThat((String) get.getSource().get("text"), equalTo("v0"));
// With force version
client().prepareUpdate(indexOrAlias(), "type", "4")
.setScript(new Script("ctx._source.text = 'v2'", ScriptService.ScriptType.INLINE, null, null))
.setVersion(10).setVersionType(VersionType.FORCE).setUpsert("{ \"text\": \"v0\" }").get();
get = get("test", "type", "4");
assertThat(get.getVersion(), equalTo(10l));
assertThat((String) get.getSource().get("text"), equalTo("v0"));
// retry on conflict is rejected:
assertThrows(client().prepareUpdate(indexOrAlias(), "type", "1").setVersion(10).setRetryOnConflict(5), ActionRequestValidationException.class);

View File

@ -1,33 +0,0 @@
---
"External version":
- do:
create:
index: test_1
type: test
id: 1
body: { foo: bar }
version_type: external
version: 5
- match: { _version: 5}
- do:
catch: conflict
create:
index: test_1
type: test
id: 1
body: { foo: bar }
version_type: external
version: 5
- do:
catch: conflict
create:
index: test_1
type: test
id: 1
body: { foo: bar }
version_type: external
version: 6

View File

@ -1,33 +0,0 @@
---
"External version":
- do:
create:
index: test_1
type: test
id: 1
body: { foo: bar }
version_type: external_gte
version: 5
- match: { _version: 5}
- do:
catch: conflict
create:
index: test_1
type: test
id: 1
body: { foo: bar }
version_type: external_gte
version: 5
- do:
catch: conflict
create:
index: test_1
type: test
id: 1
body: { foo: bar }
version_type: external_gte
version: 6

View File

@ -1,33 +0,0 @@
---
"External version":
- do:
create:
index: test_1
type: test
id: 1
body: { foo: bar }
version_type: force
version: 5
- match: { _version: 5}
- do:
catch: conflict
create:
index: test_1
type: test
id: 1
body: { foo: bar }
version_type: force
version: 5
- do:
catch: conflict
create:
index: test_1
type: test
id: 1
body: { foo: bar }
version_type: force
version: 6

View File

@ -2,7 +2,7 @@
"Internal version":
- do:
catch: conflict
catch: missing
update:
index: test_1
type: test
@ -10,7 +10,14 @@
version: 1
body:
doc: { foo: baz }
upsert: { foo: bar }
- do:
index:
index: test_1
type: test
id: 1
body:
doc: { foo: baz }
- do:
catch: conflict
@ -18,7 +25,6 @@
index: test_1
type: test
id: 1
version: 1
version: 2
body:
doc: { foo: baz }
upsert: { foo: bar }