From 178629382cae497aaa5b4842208cd570b189bd52 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Thu, 13 Jun 2013 19:37:33 +0200 Subject: [PATCH] Added version support to update requests Moved version handling from RobinEngine into VersionType. This avoids code re-use and makes it cleaner and easier to read. Closes #3111 --- .../action/bulk/BulkRequest.java | 5 +- .../action/index/IndexRequest.java | 3 +- .../action/update/UpdateHelper.java | 24 ++- .../action/update/UpdateRequest.java | 61 ++++++- .../action/update/UpdateRequestBuilder.java | 28 +++- .../org/elasticsearch/index/VersionType.java | 52 +++++- .../index/engine/robin/RobinEngine.java | 158 +++++------------- .../rest/action/update/RestUpdateAction.java | 3 + .../hamcrest/ElasticsearchAssertions.java | 40 +++-- .../AbstractSharedClusterTest.java | 7 +- .../test/integration/document/BulkTests.java | 67 +++++++- .../test/integration/update/UpdateTests.java | 74 +++++++- .../test/unit/index/VersionTypeTests.java | 115 +++++++++++++ 13 files changed, 472 insertions(+), 165 deletions(-) create mode 100644 src/test/java/org/elasticsearch/test/unit/index/VersionTypeTests.java diff --git a/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java b/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java index e58b2dcc032..3dd8d2f1ed8 100644 --- a/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java +++ b/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java @@ -33,6 +33,7 @@ import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContent; import org.elasticsearch.common.xcontent.XContentFactory; @@ -155,6 +156,7 @@ public class BulkRequest extends ActionRequest { } return this; } + /** * Adds an {@link DeleteRequest} to the list of actions to execute. */ @@ -272,7 +274,7 @@ public class BulkRequest extends ActionRequest { String timestamp = null; Long ttl = null; String opType = null; - long version = 0; + long version = Versions.MATCH_ANY; VersionType versionType = VersionType.INTERNAL; String percolate = null; int retryOnConflict = 0; @@ -345,6 +347,7 @@ public class BulkRequest extends ActionRequest { .percolate(percolate), payload); } else if ("update".equals(action)) { internalAdd(new UpdateRequest(index, type, id).routing(routing).parent(parent).retryOnConflict(retryOnConflict) + .version(version).versionType(versionType) .source(data.slice(from, nextMarker - from)) .percolate(percolate), payload); } diff --git a/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/src/main/java/org/elasticsearch/action/index/IndexRequest.java index 56aab87a8d8..b035263cea1 100644 --- a/src/main/java/org/elasticsearch/action/index/IndexRequest.java +++ b/src/main/java/org/elasticsearch/action/index/IndexRequest.java @@ -37,6 +37,7 @@ import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.xcontent.*; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.mapper.internal.TimestampFieldMapper; @@ -130,7 +131,7 @@ public class IndexRequest extends ShardReplicationOperationRequest private OpType opType = OpType.INDEX; private boolean refresh = false; - private long version = 0; + private long version = Versions.MATCH_ANY; private VersionType versionType = VersionType.INTERNAL; private String percolate; diff --git a/src/main/java/org/elasticsearch/action/update/UpdateHelper.java b/src/main/java/org/elasticsearch/action/update/UpdateHelper.java index c6161c79d66..476c3200137 100644 --- a/src/main/java/org/elasticsearch/action/update/UpdateHelper.java +++ b/src/main/java/org/elasticsearch/action/update/UpdateHelper.java @@ -14,8 +14,10 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.DocumentMissingException; import org.elasticsearch.index.engine.DocumentSourceMissingException; +import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.get.GetField; import org.elasticsearch.index.get.GetResult; import org.elasticsearch.index.mapper.internal.ParentFieldMapper; @@ -78,9 +80,23 @@ public class UpdateHelper extends AbstractComponent { .refresh(request.refresh()) .replicationType(request.replicationType()).consistencyLevel(request.consistencyLevel()); indexRequest.operationThreaded(false); + if (request.versionType() == VersionType.EXTERNAL) { + // in external versioning mode, we want to create the new document using the given version. + indexRequest.version(request.version()).versionType(VersionType.EXTERNAL); + } return new Result(indexRequest, Operation.UPSERT, null, null); } + if (request.versionType().isVersionConflict(getResult.getVersion(), request.version())) { + throw new VersionConflictEngineException(new ShardId(request.index(), request.shardId()), request.type(), request.id(), + getResult.getVersion(), request.version()); + } + + long updateVersion = getResult.getVersion(); + if (request.versionType() == VersionType.EXTERNAL) { + updateVersion = request.version(); // remember, match_any is excluded by the conflict test + } + if (getResult.internalSourceRef() == null) { // no source, we can't do nothing, through a failure... throw new DocumentSourceMissingException(new ShardId(request.index(), request.shardId()), request.type(), request.id()); @@ -148,12 +164,11 @@ public class UpdateHelper extends AbstractComponent { } } - // TODO: external version type, does it make sense here? does not seem like it... - // TODO: because we use getResult.getVersion we loose the doc.version. The question is where is the right place? if (operation == null || "index".equals(operation)) { final IndexRequest indexRequest = Requests.indexRequest(request.index()).type(request.type()).id(request.id()).routing(routing).parent(parent) .source(updatedSourceAsMap, updateSourceContentType) - .version(getResult.getVersion()).replicationType(request.replicationType()).consistencyLevel(request.consistencyLevel()) + .version(updateVersion).versionType(request.versionType()) + .replicationType(request.replicationType()).consistencyLevel(request.consistencyLevel()) .timestamp(timestamp).ttl(ttl) .percolate(request.percolate()) .refresh(request.refresh()); @@ -161,7 +176,8 @@ public class UpdateHelper extends AbstractComponent { return new Result(indexRequest, Operation.INDEX, updatedSourceAsMap, updateSourceContentType); } else if ("delete".equals(operation)) { DeleteRequest deleteRequest = Requests.deleteRequest(request.index()).type(request.type()).id(request.id()).routing(routing).parent(parent) - .version(getResult.getVersion()).replicationType(request.replicationType()).consistencyLevel(request.consistencyLevel()); + .version(updateVersion).versionType(request.versionType()) + .replicationType(request.replicationType()).consistencyLevel(request.consistencyLevel()); deleteRequest.operationThreaded(false); return new Result(deleteRequest, Operation.DELETE, updatedSourceAsMap, updateSourceContentType); } else if ("none".equals(operation)) { diff --git a/src/main/java/org/elasticsearch/action/update/UpdateRequest.java b/src/main/java/org/elasticsearch/action/update/UpdateRequest.java index 6ddde7f15d8..ac23bbe59bc 100644 --- a/src/main/java/org/elasticsearch/action/update/UpdateRequest.java +++ b/src/main/java/org/elasticsearch/action/update/UpdateRequest.java @@ -31,10 +31,12 @@ import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.VersionType; import java.io.IOException; import java.util.Map; @@ -59,7 +61,9 @@ public class UpdateRequest extends InstanceShardOperationRequest private String[] fields; - int retryOnConflict = 0; + private long version = Versions.MATCH_ANY; + private VersionType versionType = VersionType.INTERNAL; + private int retryOnConflict = 0; private String percolate; @@ -69,7 +73,7 @@ public class UpdateRequest extends InstanceShardOperationRequest private WriteConsistencyLevel consistencyLevel = WriteConsistencyLevel.DEFAULT; private IndexRequest upsertRequest; - + private boolean docAsUpsert = false; @Nullable @@ -94,14 +98,19 @@ public class UpdateRequest extends InstanceShardOperationRequest if (id == null) { validationException = addValidationError("id is missing", validationException); } + + if (version != Versions.MATCH_ANY && retryOnConflict > 0) { + validationException = addValidationError("can't provide both retry_on_conflict and a specific version", validationException); + } + if (script == null && doc == null) { validationException = addValidationError("script or doc is missing", validationException); } if (script != null && doc != null) { validationException = addValidationError("can't provide both script and doc", validationException); } - if(doc == null && docAsUpsert){ - validationException = addValidationError("can't say to upsert doc without providing doc", validationException); + if (doc == null && docAsUpsert) { + validationException = addValidationError("can't say to upsert doc without providing doc", validationException); } return validationException; } @@ -285,6 +294,31 @@ public class UpdateRequest extends InstanceShardOperationRequest return this.retryOnConflict; } + /** + * Sets the version, which will cause the index operation to only be performed if a matching + * version exists and no changes happened on the doc since then. + */ + public UpdateRequest version(long version) { + this.version = version; + return this; + } + + public long version() { + return this.version; + } + + /** + * Sets the versioning type. Defaults to {@link VersionType#INTERNAL}. + */ + public UpdateRequest versionType(VersionType versionType) { + this.versionType = versionType; + return this; + } + + public VersionType versionType() { + return this.versionType; + } + /** * Causes the update request document to be percolated. The parameter is the percolate query * to use to reduce the percolated queries that are going to run against this doc. Can be @@ -396,6 +430,14 @@ public class UpdateRequest extends InstanceShardOperationRequest return this; } + /** + * Sets the doc to use for updates when a script is not specified. + */ + public UpdateRequest doc(String field, Object value) { + safeDoc().source(field, value); + return this; + } + public IndexRequest doc() { return this.doc; } @@ -513,8 +555,8 @@ public class UpdateRequest extends InstanceShardOperationRequest XContentBuilder docBuilder = XContentFactory.contentBuilder(xContentType); docBuilder.copyCurrentStructure(parser); safeDoc().source(docBuilder); - } else if("doc_as_upsert".equals(currentFieldName)){ - docAsUpsert(parser.booleanValue()); + } else if ("doc_as_upsert".equals(currentFieldName)) { + docAsUpsert(parser.booleanValue()); } } } finally { @@ -526,9 +568,10 @@ public class UpdateRequest extends InstanceShardOperationRequest public boolean docAsUpsert() { return this.docAsUpsert; } + public void docAsUpsert(boolean shouldUpsertDoc) { this.docAsUpsert = shouldUpsertDoc; - if(this.doc != null && this.upsertRequest == null){ + if (this.doc != null && this.upsertRequest == null) { upsert(doc); } } @@ -565,6 +608,8 @@ public class UpdateRequest extends InstanceShardOperationRequest if (in.getVersion().onOrAfter(Version.V_0_90_2)) { docAsUpsert = in.readBoolean(); } + version = in.readLong(); + versionType = VersionType.fromValue(in.readByte()); } @Override @@ -612,6 +657,8 @@ public class UpdateRequest extends InstanceShardOperationRequest if (out.getVersion().onOrAfter(Version.V_0_90_2)) { out.writeBoolean(docAsUpsert); } + out.writeLong(version); + out.writeByte(versionType.getValue()); } } diff --git a/src/main/java/org/elasticsearch/action/update/UpdateRequestBuilder.java b/src/main/java/org/elasticsearch/action/update/UpdateRequestBuilder.java index 022f7529cb7..eaa0449a28a 100644 --- a/src/main/java/org/elasticsearch/action/update/UpdateRequestBuilder.java +++ b/src/main/java/org/elasticsearch/action/update/UpdateRequestBuilder.java @@ -29,6 +29,7 @@ import org.elasticsearch.client.internal.InternalClient; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.VersionType; import java.util.Map; @@ -124,6 +125,24 @@ public class UpdateRequestBuilder extends InstanceShardOperationRequestBuildertrue. Defaults @@ -216,6 +235,14 @@ public class UpdateRequestBuilder extends InstanceShardOperationRequestBuilder listener) { ((Client) client).update(request, listener); } - } diff --git a/src/main/java/org/elasticsearch/index/VersionType.java b/src/main/java/org/elasticsearch/index/VersionType.java index 05d66578dd6..4b73d51ace0 100644 --- a/src/main/java/org/elasticsearch/index/VersionType.java +++ b/src/main/java/org/elasticsearch/index/VersionType.java @@ -19,13 +19,47 @@ package org.elasticsearch.index; import org.elasticsearch.ElasticSearchIllegalArgumentException; +import org.elasticsearch.common.lucene.uid.Versions; /** * */ public enum VersionType { - INTERNAL((byte) 0), - EXTERNAL((byte) 1); + INTERNAL((byte) 0) { + /** + * - always returns false if currentVersion == {@link Versions#NOT_SET} + * - always accepts expectedVersion == {@link Versions#MATCH_ANY} + * - if expectedVersion is set, always conflict if currentVersion == {@link Versions#NOT_FOUND} + */ + @Override + public boolean isVersionConflict(long currentVersion, long expectedVersion) { + return currentVersion != Versions.NOT_SET && expectedVersion != Versions.MATCH_ANY + && (currentVersion == Versions.NOT_FOUND || currentVersion != expectedVersion); + } + + @Override + public long updateVersion(long currentVersion, long expectedVersion) { + return (currentVersion == Versions.NOT_SET || currentVersion == Versions.NOT_FOUND) ? 1 : currentVersion + 1; + } + + }, + EXTERNAL((byte) 1) { + /** + * - always returns false if currentVersion == {@link Versions#NOT_SET} + * - always conflict if expectedVersion == {@link Versions#MATCH_ANY} (we need something to set) + * - accepts currentVersion == {@link Versions#NOT_FOUND} + */ + @Override + public boolean isVersionConflict(long currentVersion, long expectedVersion) { + return currentVersion != Versions.NOT_SET && currentVersion != Versions.NOT_FOUND + && (expectedVersion == Versions.MATCH_ANY || currentVersion >= expectedVersion); + } + + @Override + public long updateVersion(long currentVersion, long expectedVersion) { + return expectedVersion; + } + }; private final byte value; @@ -37,6 +71,20 @@ public enum VersionType { return value; } + /** + * Checks whether the current version conflicts with the expected version, based on the current version type. + * + * @return true if versions conflict false o.w. + */ + public abstract boolean isVersionConflict(long currentVersion, long expectedVersion); + + /** + * Returns the new version for a document, based on it's current one and the specified in the request + * + * @return new version + */ + public abstract long updateVersion(long currentVersion, long expectedVersion); + public static VersionType fromString(String versionType) { if ("internal".equals(versionType)) { return INTERNAL; diff --git a/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java b/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java index 39050402730..31889a2b6d0 100644 --- a/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java +++ b/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java @@ -394,49 +394,23 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { // same logic as index long updatedVersion; + long expectedVersion = create.version(); if (create.origin() == Operation.Origin.PRIMARY) { - if (create.versionType() == VersionType.INTERNAL) { // internal version type - long expectedVersion = create.version(); - if (expectedVersion != Versions.MATCH_ANY && currentVersion != Versions.NOT_SET) { - // an explicit version is provided, see if there is a conflict - // if we did not find anything, and a version is provided, so we do expect to find a doc under that version - // this is important, since we don't allow to preset a version in order to handle deletes - if (currentVersion == Versions.NOT_FOUND) { - throw new VersionConflictEngineException(shardId, create.type(), create.id(), Versions.NOT_FOUND, expectedVersion); - } else if (expectedVersion != currentVersion) { - throw new VersionConflictEngineException(shardId, create.type(), create.id(), currentVersion, expectedVersion); - } - - } - updatedVersion = (currentVersion == Versions.NOT_SET || currentVersion == Versions.NOT_FOUND) ? 1 : currentVersion + 1; - } else { // external version type - // an external version is provided, just check, if a local version exists, that its higher than it - // the actual version checking is one in an external system, and we just want to not index older versions - if (currentVersion >= 0) { // we can check!, its there - if (currentVersion >= create.version()) { - throw new VersionConflictEngineException(shardId, create.type(), create.id(), currentVersion, create.version()); - } - } - updatedVersion = create.version(); + if (create.versionType().isVersionConflict(currentVersion, expectedVersion)) { + throw new VersionConflictEngineException(shardId, create.type(), create.id(), currentVersion, expectedVersion); } + updatedVersion = create.versionType().updateVersion(currentVersion, expectedVersion); } else { // if (index.origin() == Operation.Origin.REPLICA || index.origin() == Operation.Origin.RECOVERY) { - long expectedVersion = create.version(); - if (currentVersion != Versions.NOT_SET) { // we don't have a version, so ignore... - // if it does not exists, and its considered the first index operation (replicas/recovery are 1 of) - // then nothing to check - if (!(currentVersion == Versions.NOT_FOUND && create.version() == 1)) { - // with replicas/recovery, we only check for previous version, we allow to set a future version - if (expectedVersion <= currentVersion) { - if (create.origin() == Operation.Origin.RECOVERY) { - return; - } else { - throw new VersionConflictEngineException(shardId, create.type(), create.id(), currentVersion, expectedVersion); - } - } + // replicas treat the version as "external" as it comes from the primary -> + // only exploding if the version they got is lower or equal to what they know. + if (VersionType.EXTERNAL.isVersionConflict(currentVersion, expectedVersion)) { + if (create.origin() == Operation.Origin.RECOVERY) { + return; + } else { + throw new VersionConflictEngineException(shardId, create.type(), create.id(), currentVersion, expectedVersion); } } - // replicas already hold the "future" version - updatedVersion = create.version(); + updatedVersion = VersionType.EXTERNAL.updateVersion(currentVersion, expectedVersion); } // if the doc does not exists or it exists but not delete @@ -516,49 +490,25 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { } long updatedVersion; + long expectedVersion = index.version(); if (index.origin() == Operation.Origin.PRIMARY) { - if (index.versionType() == VersionType.INTERNAL) { // internal version type - long expectedVersion = index.version(); - if (expectedVersion != Versions.MATCH_ANY && currentVersion != Versions.NOT_SET) { - // an explicit version is provided, see if there is a conflict - // if we did not find anything, and a version is provided, so we do expect to find a doc under that version - // this is important, since we don't allow to preset a version in order to handle deletes - if (currentVersion == Versions.NOT_FOUND) { - throw new VersionConflictEngineException(shardId, index.type(), index.id(), Versions.NOT_FOUND, expectedVersion); - } else if (expectedVersion != currentVersion) { - throw new VersionConflictEngineException(shardId, index.type(), index.id(), currentVersion, expectedVersion); - } - } - updatedVersion = (currentVersion == Versions.NOT_SET || currentVersion == Versions.NOT_FOUND) ? 1 : currentVersion + 1; + if (index.versionType().isVersionConflict(currentVersion, expectedVersion)) { + throw new VersionConflictEngineException(shardId, index.type(), index.id(), currentVersion, expectedVersion); + } + + updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion); - } else { // external version type - // an external version is provided, just check, if a local version exists, that its higher than it - // the actual version checking is one in an external system, and we just want to not index older versions - if (currentVersion >= 0) { // we can check!, its there - if (currentVersion >= index.version()) { - throw new VersionConflictEngineException(shardId, index.type(), index.id(), currentVersion, index.version()); - } - } - updatedVersion = index.version(); - } } else { // if (index.origin() == Operation.Origin.REPLICA || index.origin() == Operation.Origin.RECOVERY) { - long expectedVersion = index.version(); - if (currentVersion != Versions.NOT_SET) { // we don't have a version, so ignore... - // if it does not exists, and its considered the first index operation (replicas/recovery are 1 of) - // then nothing to check - if (!(currentVersion == Versions.NOT_FOUND && index.version() == 1)) { - // with replicas/recovery, we only check for previous version, we allow to set a future version - if (expectedVersion <= currentVersion) { - if (index.origin() == Operation.Origin.RECOVERY) { - return; - } else { - throw new VersionConflictEngineException(shardId, index.type(), index.id(), currentVersion, expectedVersion); - } - } + // replicas treat the version as "external" as it comes from the primary -> + // only exploding if the version they got is lower or equal to what they know. + if (VersionType.EXTERNAL.isVersionConflict(currentVersion, expectedVersion)) { + if (index.origin() == Operation.Origin.RECOVERY) { + return; + } else { + throw new VersionConflictEngineException(shardId, index.type(), index.id(), currentVersion, expectedVersion); } } - // replicas already hold the "future" version - updatedVersion = index.version(); + updatedVersion = VersionType.EXTERNAL.updateVersion(currentVersion, expectedVersion); } index.version(updatedVersion); @@ -571,7 +521,9 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { writer.addDocument(index.docs().get(0), index.analyzer()); } } else { - if (versionValue != null) index.created(versionValue.delete()); // we have a delete which is not GC'ed... + if (versionValue != null) { + index.created(versionValue.delete()); // we have a delete which is not GC'ed... + } if (index.docs().size() > 1) { writer.updateDocuments(index.uid(), index.docs(), index.analyzer()); } else { @@ -629,45 +581,25 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { } long updatedVersion; + long expectedVersion = delete.version(); if (delete.origin() == Operation.Origin.PRIMARY) { - if (delete.versionType() == VersionType.INTERNAL) { // internal version type - if (delete.version() != Versions.MATCH_ANY && currentVersion != Versions.NOT_SET) { // we don't have a version, so ignore... - // an explicit version is provided, see if there is a conflict - // if we did not find anything and a version is provided, so we do expect to find a doc under that version - if (currentVersion == Versions.NOT_FOUND) { - throw new VersionConflictEngineException(shardId, delete.type(), delete.id(), Versions.NOT_FOUND, delete.version()); - } else if (delete.version() != currentVersion) { - throw new VersionConflictEngineException(shardId, delete.type(), delete.id(), currentVersion, delete.version()); - } - } - updatedVersion = (currentVersion == Versions.NOT_SET || currentVersion == Versions.NOT_FOUND) ? 1 : currentVersion + 1; + if (delete.versionType().isVersionConflict(currentVersion, expectedVersion)) { + throw new VersionConflictEngineException(shardId, delete.type(), delete.id(), currentVersion, expectedVersion); + } + + updatedVersion = delete.versionType().updateVersion(currentVersion, expectedVersion); - } else { // External - if (currentVersion == Versions.NOT_FOUND) { - // its an external version, that's fine, we allow it to be set - //throw new VersionConflictEngineException(shardId, delete.type(), delete.id(), UidField.DocIdAndVersion.Versions.NOT_FOUND, delete.version()); - } else if (currentVersion >= delete.version()) { - throw new VersionConflictEngineException(shardId, delete.type(), delete.id(), currentVersion, delete.version()); - } - updatedVersion = delete.version(); - } } else { // if (index.origin() == Operation.Origin.REPLICA || index.origin() == Operation.Origin.RECOVERY) { - // on replica, the version is the future value expected (returned from the operation on the primary) - if (currentVersion != Versions.NOT_SET) { // we don't have a version in the index, ignore - // only check if we have a version for it, otherwise, ignore (see later) - if (currentVersion != Versions.NOT_FOUND) { - // with replicas, we only check for previous version, we allow to set a future version - if (delete.version() <= currentVersion) { - if (delete.origin() == Operation.Origin.RECOVERY) { - return; - } else { - throw new VersionConflictEngineException(shardId, delete.type(), delete.id(), currentVersion - 1, delete.version()); - } - } + // replicas treat the version as "external" as it comes from the primary -> + // only exploding if the version they got is lower or equal to what they know. + if (VersionType.EXTERNAL.isVersionConflict(currentVersion, expectedVersion)) { + if (delete.origin() == Operation.Origin.RECOVERY) { + return; + } else { + throw new VersionConflictEngineException(shardId, delete.type(), delete.id(), currentVersion - 1, expectedVersion); } } - // replicas already hold the "future" version - updatedVersion = delete.version(); + updatedVersion = VersionType.EXTERNAL.updateVersion(currentVersion, expectedVersion); } if (currentVersion == Versions.NOT_FOUND) { @@ -1056,7 +988,9 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine { snapshotIndexCommit = deletionPolicy.snapshot(); traslogSnapshot = translog.snapshot(); } catch (Exception e) { - if (snapshotIndexCommit != null) snapshotIndexCommit.release(); + if (snapshotIndexCommit != null) { + snapshotIndexCommit.release(); + } throw new SnapshotFailedEngineException(shardId, e); } finally { rwl.readLock().unlock(); diff --git a/src/main/java/org/elasticsearch/rest/action/update/RestUpdateAction.java b/src/main/java/org/elasticsearch/rest/action/update/RestUpdateAction.java index 748642f4a9e..32a524b1ad9 100644 --- a/src/main/java/org/elasticsearch/rest/action/update/RestUpdateAction.java +++ b/src/main/java/org/elasticsearch/rest/action/update/RestUpdateAction.java @@ -86,6 +86,9 @@ public class RestUpdateAction extends BaseRestHandler { } } updateRequest.retryOnConflict(request.paramAsInt("retry_on_conflict", updateRequest.retryOnConflict())); + updateRequest.version(RestActions.parseVersion(request)); + updateRequest.versionType(VersionType.fromString(request.param("version_type"), updateRequest.versionType())); + // see if we have it in the body if (request.hasContent()) { diff --git a/src/test/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java b/src/test/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java index 616d650d37a..b0a653eedbf 100644 --- a/src/test/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java +++ b/src/test/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java @@ -20,9 +20,10 @@ package org.elasticsearch.test.hamcrest; import org.apache.lucene.search.BooleanQuery; import org.apache.lucene.search.Query; -import org.elasticsearch.action.count.CountResponse; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.action.ActionFuture; +import org.elasticsearch.action.ActionRequestBuilder; +import org.elasticsearch.action.count.CountResponse; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.broadcast.BroadcastOperationResponse; import org.elasticsearch.search.SearchHit; @@ -65,14 +66,14 @@ public class ElasticsearchAssertions { public static void assertSearchHit(SearchResponse searchResponse, int number, Matcher matcher) { assert number > 0; assertThat("SearchHit number must be greater than 0", number, greaterThan(0)); - assertThat(searchResponse.getHits().totalHits(), greaterThanOrEqualTo((long)number)); - assertSearchHit(searchResponse.getHits().getAt(number-1), matcher); + assertThat(searchResponse.getHits().totalHits(), greaterThanOrEqualTo((long) number)); + assertSearchHit(searchResponse.getHits().getAt(number - 1), matcher); } - + public static void assertNoFailures(SearchResponse searchResponse) { assertThat("Unexpectd ShardFailures: " + Arrays.toString(searchResponse.getShardFailures()), searchResponse.getShardFailures().length, equalTo(0)); } - + public static void assertNoFailures(BroadcastOperationResponse response) { assertThat("Unexpectd ShardFailures: " + Arrays.toString(response.getShardFailures()), response.getFailedShards(), equalTo(0)); } @@ -88,16 +89,16 @@ public class ElasticsearchAssertions { assertThat(resp.getHits().hits()[hit].getHighlightFields().get(field).fragments().length, greaterThan(fragment)); assertThat(resp.getHits().hits()[hit].highlightFields().get(field).fragments()[fragment].string(), matcher); } - + public static void assertSuggestionSize(Suggest searchSuggest, int entry, int size, String key) { assertThat(searchSuggest, notNullValue()); - assertThat(searchSuggest.size(),greaterThanOrEqualTo(1)); + assertThat(searchSuggest.size(), greaterThanOrEqualTo(1)); assertThat(searchSuggest.getSuggestion(key).getName(), equalTo(key)); assertThat(searchSuggest.getSuggestion(key).getEntries().size(), greaterThanOrEqualTo(entry)); assertThat(searchSuggest.getSuggestion(key).getEntries().get(entry).getOptions().size(), equalTo(size)); } - + public static void assertSuggestion(Suggest searchSuggest, int entry, int ord, String key, String text) { assertThat(searchSuggest, notNullValue()); assertThat(searchSuggest.size(), greaterThanOrEqualTo(1)); @@ -121,31 +122,34 @@ public class ElasticsearchAssertions { public static Matcher hasIndex(final String index) { return new ElasticsearchMatchers.SearchHitHasIndexMatcher(index); } - + public static T assertBooleanSubQuery(Query query, Class subqueryType, int i) { assertThat(query, instanceOf(BooleanQuery.class)); BooleanQuery q = (BooleanQuery) query; assertThat(q.getClauses().length, greaterThan(i)); assertThat(q.getClauses()[i].getQuery(), instanceOf(subqueryType)); - return (T)q.getClauses()[i].getQuery(); + return (T) q.getClauses()[i].getQuery(); } - public static void assertThrows(ActionFuture future, Class exceptionClass) { - boolean fail=false; + public static void assertThrows(ActionRequestBuilder builder, Class exceptionClass) { + assertThrows(builder.execute(), exceptionClass); + } + + public static void assertThrows(ActionFuture future, Class exceptionClass) { + boolean fail = false; try { future.actionGet(); - fail=true; + fail = true; - } - catch (ElasticSearchException esException) { + } catch (ElasticSearchException esException) { assertThat(esException.unwrapCause(), instanceOf(exceptionClass)); - } - catch (Throwable e) { + } catch (Throwable e) { assertThat(e, instanceOf(exceptionClass)); } // has to be outside catch clause to get a proper message - if (fail) + if (fail) { throw new AssertionError("Expected a " + exceptionClass + " exception to be thrown"); + } } } diff --git a/src/test/java/org/elasticsearch/test/integration/AbstractSharedClusterTest.java b/src/test/java/org/elasticsearch/test/integration/AbstractSharedClusterTest.java index 9fb1587b17a..ce6363fac47 100644 --- a/src/test/java/org/elasticsearch/test/integration/AbstractSharedClusterTest.java +++ b/src/test/java/org/elasticsearch/test/integration/AbstractSharedClusterTest.java @@ -30,6 +30,7 @@ import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRespon import org.elasticsearch.action.admin.indices.flush.FlushResponse; import org.elasticsearch.action.admin.indices.optimize.OptimizeResponse; import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; +import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.broadcast.BroadcastOperationRequestBuilder; import org.elasticsearch.action.support.broadcast.BroadcastOperationResponse; @@ -46,7 +47,6 @@ import org.elasticsearch.common.Priority; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.index.merge.policy.AbstractMergePolicyProvider; import org.elasticsearch.indices.IndexAlreadyExistsException; import org.elasticsearch.indices.IndexMissingException; import org.elasticsearch.indices.IndexTemplateMissingException; @@ -58,7 +58,6 @@ import org.testng.annotations.BeforeMethod; import java.io.IOException; import java.util.HashSet; import java.util.Iterator; -import java.util.Random; import java.util.Set; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; @@ -319,6 +318,10 @@ public abstract class AbstractSharedClusterTest extends ElasticsearchTestCase { return client().prepareIndex(index, type).setSource(source).execute().actionGet(); } + protected GetResponse get(String index, String type, String id) { + return client().prepareGet(index, type, id).execute().actionGet(); + } + protected IndexResponse index(String index, String type, String id, String field, Object value) { return client().prepareIndex(index, type, id).setSource(field, value).execute().actionGet(); } diff --git a/src/test/java/org/elasticsearch/test/integration/document/BulkTests.java b/src/test/java/org/elasticsearch/test/integration/document/BulkTests.java index 1991ced6af1..4046c5da44a 100644 --- a/src/test/java/org/elasticsearch/test/integration/document/BulkTests.java +++ b/src/test/java/org/elasticsearch/test/integration/document/BulkTests.java @@ -1,21 +1,23 @@ package org.elasticsearch.test.integration.document; -import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.nullValue; - import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.update.UpdateRequestBuilder; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.common.Priority; import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.index.VersionType; import org.elasticsearch.test.integration.AbstractSharedClusterTest; import org.testng.annotations.Test; +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.*; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + /** */ public class BulkTests extends AbstractSharedClusterTest { @@ -105,6 +107,53 @@ public class BulkTests extends AbstractSharedClusterTest { assertThat(((Long) getResponse.getField("field").getValue()), equalTo(4l)); } + @Test + public void testBulkVersioning() throws Exception { + createIndex("test"); + ensureGreen(); + BulkResponse bulkResponse = run(client().prepareBulk() + .add(client().prepareIndex("test", "type", "1").setCreate(true).setSource("field", "1")) + .add(client().prepareIndex("test", "type", "2").setCreate(true).setSource("field", "1")) + .add(client().prepareIndex("test", "type", "1").setSource("field", "2"))); + + assertTrue(((IndexResponse) bulkResponse.getItems()[0].getResponse()).isCreated()); + assertThat(((IndexResponse) bulkResponse.getItems()[0].getResponse()).getVersion(), equalTo(1l)); + assertTrue(((IndexResponse) bulkResponse.getItems()[1].getResponse()).isCreated()); + assertThat(((IndexResponse) bulkResponse.getItems()[1].getResponse()).getVersion(), equalTo(1l)); + assertFalse(((IndexResponse) bulkResponse.getItems()[2].getResponse()).isCreated()); + assertThat(((IndexResponse) bulkResponse.getItems()[2].getResponse()).getVersion(), equalTo(2l)); + + bulkResponse = run(client().prepareBulk() + .add(client().prepareUpdate("test", "type", "1").setVersion(4l).setDoc("field", "2")) + .add(client().prepareUpdate("test", "type", "2").setDoc("field", "2")) + .add(client().prepareUpdate("test", "type", "1").setVersion(2l).setDoc("field", "3"))); + + assertThat(bulkResponse.getItems()[0].getFailureMessage(), containsString("Version")); + assertThat(((UpdateResponse) bulkResponse.getItems()[1].getResponse()).getVersion(), equalTo(2l)); + assertThat(((UpdateResponse) bulkResponse.getItems()[2].getResponse()).getVersion(), equalTo(3l)); + + bulkResponse = run(client().prepareBulk() + .add(client().prepareIndex("test", "type", "e1").setCreate(true).setSource("field", "1").setVersion(10).setVersionType(VersionType.EXTERNAL)) + .add(client().prepareIndex("test", "type", "e2").setCreate(true).setSource("field", "1").setVersion(10).setVersionType(VersionType.EXTERNAL)) + .add(client().prepareIndex("test", "type", "e1").setSource("field", "2").setVersion(12).setVersionType(VersionType.EXTERNAL))); + + assertTrue(((IndexResponse) bulkResponse.getItems()[0].getResponse()).isCreated()); + assertThat(((IndexResponse) bulkResponse.getItems()[0].getResponse()).getVersion(), equalTo(10l)); + assertTrue(((IndexResponse) bulkResponse.getItems()[1].getResponse()).isCreated()); + assertThat(((IndexResponse) bulkResponse.getItems()[1].getResponse()).getVersion(), equalTo(10l)); + assertFalse(((IndexResponse) bulkResponse.getItems()[2].getResponse()).isCreated()); + assertThat(((IndexResponse) bulkResponse.getItems()[2].getResponse()).getVersion(), equalTo(12l)); + + bulkResponse = run(client().prepareBulk() + .add(client().prepareUpdate("test", "type", "e1").setVersion(4l).setDoc("field", "2").setVersion(10).setVersionType(VersionType.EXTERNAL)) + .add(client().prepareUpdate("test", "type", "e2").setDoc("field", "2").setVersion(15).setVersionType(VersionType.EXTERNAL)) + .add(client().prepareUpdate("test", "type", "e1").setVersion(2l).setDoc("field", "3").setVersion(15).setVersionType(VersionType.EXTERNAL))); + + assertThat(bulkResponse.getItems()[0].getFailureMessage(), containsString("Version")); + assertThat(((UpdateResponse) bulkResponse.getItems()[1].getResponse()).getVersion(), equalTo(15l)); + assertThat(((UpdateResponse) bulkResponse.getItems()[2].getResponse()).getVersion(), equalTo(15l)); + } + @Test public void testBulkUpdate_malformedScripts() throws Exception { client().admin().indices().prepareDelete().execute().actionGet(); @@ -140,7 +189,7 @@ public class BulkTests extends AbstractSharedClusterTest { assertThat(((UpdateResponse) bulkResponse.getItems()[1].getResponse()).getId(), equalTo("2")); assertThat(((UpdateResponse) bulkResponse.getItems()[1].getResponse()).getVersion(), equalTo(2l)); - assertThat(((Integer)((UpdateResponse) bulkResponse.getItems()[1].getResponse()).getGetResult().field("field").getValue()), equalTo(2)); + assertThat(((Integer) ((UpdateResponse) bulkResponse.getItems()[1].getResponse()).getGetResult().field("field").getValue()), equalTo(2)); assertThat(bulkResponse.getItems()[1].getFailure(), nullValue()); assertThat(bulkResponse.getItems()[2].getFailure().getId(), equalTo("3")); @@ -182,7 +231,7 @@ public class BulkTests extends AbstractSharedClusterTest { assertThat(response.getItems()[i].getOpType(), equalTo("update")); assertThat(((UpdateResponse) response.getItems()[i].getResponse()).getId(), equalTo(Integer.toString(i))); assertThat(((UpdateResponse) response.getItems()[i].getResponse()).getVersion(), equalTo(1l)); - assertThat(((Integer)((UpdateResponse) response.getItems()[i].getResponse()).getGetResult().field("counter").getValue()), equalTo(1)); + assertThat(((Integer) ((UpdateResponse) response.getItems()[i].getResponse()).getGetResult().field("counter").getValue()), equalTo(1)); for (int j = 0; j < 5; j++) { GetResponse getResponse = client().prepareGet("test", "type1", Integer.toString(i)).setFields("counter").execute().actionGet(); @@ -219,7 +268,7 @@ public class BulkTests extends AbstractSharedClusterTest { assertThat(response.getItems()[i].getOpType(), equalTo("update")); assertThat(((UpdateResponse) response.getItems()[i].getResponse()).getId(), equalTo(Integer.toString(i))); assertThat(((UpdateResponse) response.getItems()[i].getResponse()).getVersion(), equalTo(2l)); - assertThat(((Integer)((UpdateResponse) response.getItems()[i].getResponse()).getGetResult().field("counter").getValue()), equalTo(2)); + assertThat(((Integer) ((UpdateResponse) response.getItems()[i].getResponse()).getGetResult().field("counter").getValue()), equalTo(2)); } builder = client().prepareBulk(); diff --git a/src/test/java/org/elasticsearch/test/integration/update/UpdateTests.java b/src/test/java/org/elasticsearch/test/integration/update/UpdateTests.java index 27ff0b171a3..758eb2a3a49 100644 --- a/src/test/java/org/elasticsearch/test/integration/update/UpdateTests.java +++ b/src/test/java/org/elasticsearch/test/integration/update/UpdateTests.java @@ -29,7 +29,9 @@ import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.common.Priority; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.DocumentMissingException; +import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.test.integration.AbstractSharedClusterTest; import org.testng.annotations.Test; @@ -39,6 +41,7 @@ import java.util.concurrent.CountDownLatch; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.index.query.QueryBuilders.termQuery; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.*; import static org.testng.AssertJUnit.*; @@ -163,22 +166,23 @@ public class UpdateTests extends AbstractSharedClusterTest { assertThat(getResponse.getSourceAsMap().get("field").toString(), equalTo("2")); } } + @Test public void testUpsertDoc() throws Exception { - createIndex(); + createIndex(); ClusterHealthResponse clusterHealth = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().execute().actionGet(); assertThat(clusterHealth.isTimedOut(), equalTo(false)); assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN)); - + UpdateResponse updateResponse = client().prepareUpdate("test", "type1", "1") - .setDoc(XContentFactory.jsonBuilder().startObject().field("bar", "baz").endObject()) - .setDocAsUpsert(true) + .setDoc(XContentFactory.jsonBuilder().startObject().field("bar", "baz").endObject()) + .setDocAsUpsert(true) .setFields("_source") .execute().actionGet(); assertThat(updateResponse.getGetResult(), notNullValue()); assertThat(updateResponse.getGetResult().sourceAsMap().get("bar").toString(), equalTo("baz")); } - + @Test public void testUpsertFields() throws Exception { createIndex(); @@ -207,6 +211,60 @@ public class UpdateTests extends AbstractSharedClusterTest { assertThat(updateResponse.getGetResult().sourceAsMap().get("extra").toString(), equalTo("foo")); } + @Test + public void testVersionedUpdate() throws Exception { + createIndex("test"); + ensureGreen(); + + index("test", "type", "1", "text", "value"); // version is now 1 + + assertThrows(client().prepareUpdate("test", "type", "1").setScript("ctx._source.text = 'v2'").setVersion(2).execute(), + VersionConflictEngineException.class); + + run(client().prepareUpdate("test", "type", "1").setScript("ctx._source.text = 'v2'").setVersion(1)); + assertThat(run(client().prepareGet("test", "type", "1")).getVersion(), equalTo(2l)); + + // and again with a higher version.. + run(client().prepareUpdate("test", "type", "1").setScript("ctx._source.text = 'v3'").setVersion(2)); + + assertThat(run(client().prepareGet("test", "type", "1")).getVersion(), equalTo(3l)); + + // after delete + run(client().prepareDelete("test", "type", "1")); + assertThrows(client().prepareUpdate("test", "type", "1").setScript("ctx._source.text = 'v2'").setVersion(3).execute(), + DocumentMissingException.class); + + // external versioning + run(client().prepareIndex("test", "type", "2").setSource("text", "value").setVersion(10).setVersionType(VersionType.EXTERNAL)); + assertThrows(client().prepareUpdate("test", "type", "2").setScript("ctx._source.text = 'v2'").setVersion(2).setVersionType(VersionType.EXTERNAL).execute(), + VersionConflictEngineException.class); + + run(client().prepareUpdate("test", "type", "2").setScript("ctx._source.text = 'v2'").setVersion(11).setVersionType(VersionType.EXTERNAL)); + + assertThat(run(client().prepareGet("test", "type", "2")).getVersion(), equalTo(11l)); + + // upserts - the combination with versions is a bit weird. Test are here to ensure we do not change our behavior unintentionally + + // With internal versions, tt means "if object is there with version X, update it or explode. If it is not there, index. + run(client().prepareUpdate("test", "type", "3").setScript("ctx._source.text = 'v2'").setVersion(10).setUpsertRequest("{ \"text\": \"v0\" }")); + GetResponse get = get("test", "type", "3"); + assertThat(get.getVersion(), equalTo(1l)); + assertThat((String) get.getSource().get("text"), equalTo("v0")); + + // With external versions, it means - if object is there with version lower than X, update it or explode. If it is not there, insert with new version. + run(client().prepareUpdate("test", "type", "4").setScript("ctx._source.text = 'v2'"). + setVersion(10).setVersionType(VersionType.EXTERNAL).setUpsertRequest("{ \"text\": \"v0\" }")); + get = get("test", "type", "4"); + assertThat(get.getVersion(), equalTo(10l)); + assertThat((String) get.getSource().get("text"), equalTo("v0")); + + + // retry on conflict is rejected: + + assertThrows(client().prepareUpdate("test", "type", "1").setVersion(10).setRetryOnConflict(5), ActionRequestValidationException.class); + + } + @Test public void testIndexAutoCreation() throws Exception { try { @@ -390,10 +448,10 @@ public class UpdateTests extends AbstractSharedClusterTest { assertThat(e.getMessage(), containsString("can't provide both script and doc")); } } - + @Test - public void testUpdateRequestWithScriptAndShouldUpsertDoc() throws Exception{ - createIndex(); + public void testUpdateRequestWithScriptAndShouldUpsertDoc() throws Exception { + createIndex(); ClusterHealthResponse clusterHealth = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().execute().actionGet(); assertThat(clusterHealth.isTimedOut(), equalTo(false)); assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN)); diff --git a/src/test/java/org/elasticsearch/test/unit/index/VersionTypeTests.java b/src/test/java/org/elasticsearch/test/unit/index/VersionTypeTests.java new file mode 100644 index 00000000000..d46c1b6f6a6 --- /dev/null +++ b/src/test/java/org/elasticsearch/test/unit/index/VersionTypeTests.java @@ -0,0 +1,115 @@ +package org.elasticsearch.test.unit.index; +/* + * Licensed to ElasticSearch under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +import org.elasticsearch.common.lucene.uid.Versions; +import org.elasticsearch.index.VersionType; +import org.testng.annotations.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + +public class VersionTypeTests { + @Test + public void testInternalVersionConflict() throws Exception { + + assertFalse(VersionType.INTERNAL.isVersionConflict(10, Versions.MATCH_ANY)); + // if we don't have a version in the index we accept everything + assertFalse(VersionType.INTERNAL.isVersionConflict(Versions.NOT_SET, 10)); + assertFalse(VersionType.INTERNAL.isVersionConflict(Versions.NOT_SET, Versions.MATCH_ANY)); + + // if we didn't find a version (but the index does support it), we don't like it unless MATCH_ANY + assertTrue(VersionType.INTERNAL.isVersionConflict(Versions.NOT_FOUND, Versions.NOT_FOUND)); + assertTrue(VersionType.INTERNAL.isVersionConflict(Versions.NOT_FOUND, 10)); + assertFalse(VersionType.INTERNAL.isVersionConflict(Versions.NOT_FOUND, Versions.MATCH_ANY)); + + // and the stupid usual case + assertFalse(VersionType.INTERNAL.isVersionConflict(10, 10)); + assertTrue(VersionType.INTERNAL.isVersionConflict(9, 10)); + assertTrue(VersionType.INTERNAL.isVersionConflict(10, 9)); + +// Old indexing code, dictating behavior +// if (expectedVersion != Versions.MATCH_ANY && currentVersion != Versions.NOT_SET) { +// // an explicit version is provided, see if there is a conflict +// // if we did not find anything, and a version is provided, so we do expect to find a doc under that version +// // this is important, since we don't allow to preset a version in order to handle deletes +// if (currentVersion == Versions.NOT_FOUND) { +// throw new VersionConflictEngineException(shardId, index.type(), index.id(), Versions.NOT_FOUND, expectedVersion); +// } else if (expectedVersion != currentVersion) { +// throw new VersionConflictEngineException(shardId, index.type(), index.id(), currentVersion, expectedVersion); +// } +// } +// updatedVersion = (currentVersion == Versions.NOT_SET || currentVersion == Versions.NOT_FOUND) ? 1 : currentVersion + 1; + } + + @Test + public void testExternalVersionConflict() throws Exception { + + assertFalse(VersionType.EXTERNAL.isVersionConflict(Versions.NOT_FOUND, 10)); + assertFalse(VersionType.EXTERNAL.isVersionConflict(Versions.NOT_SET, 10)); + // MATCH_ANY must throw an exception in the case of external version, as the version must be set! it used as the new value + assertTrue(VersionType.EXTERNAL.isVersionConflict(10, Versions.MATCH_ANY)); + + // if we didn't find a version (but the index does support it), we always accept + assertFalse(VersionType.EXTERNAL.isVersionConflict(Versions.NOT_FOUND, Versions.NOT_FOUND)); + assertFalse(VersionType.EXTERNAL.isVersionConflict(Versions.NOT_FOUND, 10)); + assertFalse(VersionType.EXTERNAL.isVersionConflict(Versions.NOT_FOUND, Versions.MATCH_ANY)); + + // and the standard behavior + assertTrue(VersionType.EXTERNAL.isVersionConflict(10, 10)); + assertFalse(VersionType.EXTERNAL.isVersionConflict(9, 10)); + assertTrue(VersionType.EXTERNAL.isVersionConflict(10, 9)); + + +// Old indexing code, dictating behavior +// // an external version is provided, just check, if a local version exists, that its higher than it +// // the actual version checking is one in an external system, and we just want to not index older versions +// if (currentVersion >= 0) { // we can check!, its there +// if (currentVersion >= index.version()) { +// throw new VersionConflictEngineException(shardId, index.type(), index.id(), currentVersion, index.version()); +// } +// } +// updatedVersion = index.version(); + } + + + @Test + public void testUpdateVersion() { + + assertThat(VersionType.INTERNAL.updateVersion(Versions.NOT_SET, 10), equalTo(1l)); + assertThat(VersionType.INTERNAL.updateVersion(Versions.NOT_FOUND, 10), equalTo(1l)); + assertThat(VersionType.INTERNAL.updateVersion(1, 1), equalTo(2l)); + assertThat(VersionType.INTERNAL.updateVersion(2, Versions.MATCH_ANY), equalTo(3l)); + + + assertThat(VersionType.EXTERNAL.updateVersion(Versions.NOT_SET, 10), equalTo(10l)); + assertThat(VersionType.EXTERNAL.updateVersion(Versions.NOT_FOUND, 10), equalTo(10l)); + assertThat(VersionType.EXTERNAL.updateVersion(1, 10), equalTo(10l)); + +// Old indexing code +// if (index.versionType() == VersionType.INTERNAL) { // internal version type +// updatedVersion = (currentVersion == Versions.NOT_SET || currentVersion == Versions.NOT_FOUND) ? 1 : currentVersion + 1; +// } else { // external version type +// updatedVersion = expectedVersion; +// } + } +}