Introduced VersionType.FORCE & VersionType.EXTERNAL_GTE

Also added "external_gt" as an alias name for VersionType.EXTERNAL , accessible for the rest layer.

Closes #4213 , Closes #2946
This commit is contained in:
Boaz Leskes 2014-01-31 14:38:21 +01:00
parent bf8d8dc33e
commit b7a95d11a7
31 changed files with 723 additions and 120 deletions

View File

@ -91,8 +91,7 @@ chunks, as this will slow things down.
Each bulk item can include the version value using the
`_version`/`version` field. It automatically follows the behavior of the
index / delete operation based on the `_version` mapping. It also
support the `version_type`/`_version_type` when using `external`
versioning.
support the `version_type`/`_version_type` (see <<index-versioning, versioning>>)
[float]
[[bulk-routing]]

View File

@ -126,6 +126,32 @@ a database is simplified if external versioning is used, as only the
latest version will be used if the index operations are out of order for
whatever reason.
[float]
==== Version types
Next to the `internal` & `external` version types explained above, Elasticsearch
also supports other types for specific use cases. Here is an overview of
the different version types and their semantics.
`internal`:: only index the document if the given version is identical to the version
of the stored document.
`external` or `external_gt`:: only index the document if the given version is strictly higher
than the version of the stored document *or* if there is no existing document. The given
version will be used as the new version and will be stored with the new document.
`external_gte`:: only index the document if the given version is *equal* or higher
than the version of the stored document. If there is no existing document
the operation will succeed as well. The given version will be used as the new version
and will be stored with the new document.
`force`:: the document will be indexed regardless of the version of the stored document or if there
is no existing document. The given version will be used as the new version and will be stored
with the new document. This version type is typically used for correcting errors.
*NOTE*: The `external_gte` & `force` version types are meant for special use cases and should be used
with care. If used incorrectly, they can result in loss of data.
[float]
[[operation-type]]
=== Operation Type

View File

@ -278,7 +278,6 @@ document.
* `percolate_routing` - The routing value to use when percolating the existing document.
* `percolate_preference` - Which shard to prefer when executing the percolate request.
* `version` - Enables a version check. If the fetched document's version isn't equal to the specified version then the request fails with a version conflict and the percolation request is aborted.
* `version_type` - Whether internal or external versioning is used. Defaults to internal versioning.
Internally the percolate api will issue a get request for fetching the`_source` of the document to percolate.
For this feature to work the `_source` for documents to be percolated need to be stored.

View File

@ -59,7 +59,7 @@
},
"version_type": {
"type": "enum",
"options": ["internal", "external"],
"options": ["internal", "external", "external_gte", "force"],
"description": "Specific version type"
}
}

View File

@ -56,7 +56,7 @@
},
"version_type": {
"type" : "enum",
"options" : ["internal","external"],
"options" : ["internal", "external", "external_gte", "force"],
"description" : "Specific version type"
}
}

View File

@ -65,7 +65,7 @@
},
"version_type": {
"type" : "enum",
"options" : ["internal","external"],
"options" : ["internal", "external", "external_gte", "force"],
"description" : "Specific version type"
}
}

View File

@ -61,7 +61,7 @@
},
"version_type": {
"type" : "enum",
"options" : ["internal","external"],
"options" : ["internal", "external", "external_gte", "force"],
"description" : "Specific version type"
}
}

View File

@ -69,7 +69,7 @@
},
"version_type": {
"type" : "enum",
"options" : ["internal","external"],
"options" : ["internal", "external", "external_gte", "force"],
"description" : "Specific version type"
}
}

View File

@ -59,7 +59,7 @@
},
"version_type": {
"type" : "enum",
"options" : ["internal","external"],
"options" : ["internal", "external", "external_gte", "force"],
"description" : "Specific version type"
}
}

View File

@ -79,7 +79,7 @@
},
"version_type": {
"type" : "enum",
"options" : ["internal","external"],
"options" : ["internal", "external", "external_gte", "force"],
"description" : "Specific version type"
}
}

View File

@ -0,0 +1,33 @@
---
"External version":
- do:
create:
index: test_1
type: test
id: 1
body: { foo: bar }
version_type: external_gte
version: 5
- match: { _version: 5}
- do:
catch: conflict
create:
index: test_1
type: test
id: 1
body: { foo: bar }
version_type: external_gte
version: 5
- do:
catch: conflict
create:
index: test_1
type: test
id: 1
body: { foo: bar }
version_type: external_gte
version: 6

View File

@ -0,0 +1,33 @@
---
"External version":
- do:
create:
index: test_1
type: test
id: 1
body: { foo: bar }
version_type: force
version: 5
- match: { _version: 5}
- do:
catch: conflict
create:
index: test_1
type: test
id: 1
body: { foo: bar }
version_type: force
version: 5
- do:
catch: conflict
create:
index: test_1
type: test
id: 1
body: { foo: bar }
version_type: force
version: 6

View File

@ -0,0 +1,53 @@
---
"External GTE version":
- do:
index:
index: test_1
type: test
id: 1
body: { foo: bar }
version_type: external_gte
version: 5
- match: { _version: 5}
- do:
catch: conflict
delete:
index: test_1
type: test
id: 1
version_type: external_gte
version: 4
- do:
delete:
index: test_1
type: test
id: 1
version_type: external_gte
version: 6
- match: { _version: 6}
- do:
index:
index: test_1
type: test
id: 1
body: { foo: bar }
version_type: external_gte
version: 6
- match: { _version: 6}
- do:
delete:
index: test_1
type: test
id: 1
version_type: external_gte
version: 6
- match: { _version: 6}

View File

@ -0,0 +1,44 @@
---
"Force version":
- do:
index:
index: test_1
type: test
id: 1
body: { foo: bar }
version_type: force
version: 5
- match: { _version: 5}
- do:
delete:
index: test_1
type: test
id: 1
version_type: force
version: 4
- match: { _version: 4}
- do:
index:
index: test_1
type: test
id: 1
body: { foo: bar }
version_type: force
version: 6
- match: { _version: 6}
- do:
delete:
index: test_1
type: test
id: 1
version_type: force
version: 6
- match: { _version: 6}

View File

@ -0,0 +1,45 @@
---
"External GTE version":
- do:
index:
index: test_1
type: test
id: 1
body: { foo: bar }
version_type: external_gte
version: 5
- match: { _version: 5}
- do:
catch: conflict
index:
index: test_1
type: test
id: 1
body: { foo: bar }
version_type: external_gte
version: 4
- do:
index:
index: test_1
type: test
id: 1
body: { foo: bar2 }
version_type: external_gte
version: 5
- match: { _version: 5}
- do:
index:
index: test_1
type: test
id: 1
body: { foo: bar2 }
version_type: external_gte
version: 6
- match: { _version: 6}

View File

@ -0,0 +1,46 @@
---
"Force version":
- do:
index:
index: test_1
type: test
id: 1
body: { foo: bar }
version_type: force
version: 5
- match: { _version: 5}
- do:
index:
index: test_1
type: test
id: 1
body: { foo: bar }
version_type: force
version: 4
- match: { _version: 4}
- do:
index:
index: test_1
type: test
id: 1
body: { foo: bar2 }
version_type: force
version: 5
- match: { _version: 5}
- do:
index:
index: test_1
type: test
id: 1
body: { foo: bar3 }
version_type: force
version: 5
- match: { _version: 5}

View File

@ -50,6 +50,7 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.DocumentAlreadyExistsException;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.VersionConflictEngineException;
@ -145,16 +146,18 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
BulkItemResponse[] responses = new BulkItemResponse[request.items().length];
long[] preVersions = new long[request.items().length];
VersionType[] preVersionTypes = new VersionType[request.items().length];
for (int requestIndex = 0; requestIndex < request.items().length; requestIndex++) {
BulkItemRequest item = request.items()[requestIndex];
if (item.request() instanceof IndexRequest) {
IndexRequest indexRequest = (IndexRequest) item.request();
preVersions[requestIndex] = indexRequest.version();
preVersionTypes[requestIndex] = indexRequest.versionType();
try {
WriteResult result = shardIndexOperation(request, indexRequest, clusterState, indexShard, true);
// add the response
IndexResponse indexResponse = result.response();
responses[requestIndex] = new BulkItemResponse(item.id(), indexRequest.opType().lowercase(), indexResponse);
preVersions[requestIndex] = result.preVersion;
if (result.mappingToUpdate != null) {
if (mappingsToUpdate == null) {
mappingsToUpdate = Sets.newHashSet();
@ -172,7 +175,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
if (retryPrimaryException(e)) {
// restore updated versions...
for (int j = 0; j < requestIndex; j++) {
applyVersion(request.items()[j], preVersions[j]);
applyVersion(request.items()[j], preVersions[j], preVersionTypes[j]);
}
throw (ElasticsearchException) e;
}
@ -188,6 +191,9 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
}
} else if (item.request() instanceof DeleteRequest) {
DeleteRequest deleteRequest = (DeleteRequest) item.request();
preVersions[requestIndex] = deleteRequest.version();
preVersionTypes[requestIndex] = deleteRequest.versionType();
try {
// add the response
DeleteResponse deleteResponse = shardDeleteOperation(deleteRequest, indexShard).response();
@ -197,7 +203,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
if (retryPrimaryException(e)) {
// restore updated versions...
for (int j = 0; j < requestIndex; j++) {
applyVersion(request.items()[j], preVersions[j]);
applyVersion(request.items()[j], preVersions[j], preVersionTypes[j]);
}
throw (ElasticsearchException) e;
}
@ -213,6 +219,8 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
}
} else if (item.request() instanceof UpdateRequest) {
UpdateRequest updateRequest = (UpdateRequest) item.request();
preVersions[requestIndex] = updateRequest.version();
preVersionTypes[requestIndex] = updateRequest.versionType();
// We need to do the requested retries plus the initial attempt. We don't do < 1+retry_on_conflict because retry_on_conflict may be Integer.MAX_VALUE
for (int updateAttemptsCount = 0; updateAttemptsCount <= updateRequest.retryOnConflict(); updateAttemptsCount++) {
UpdateResult updateResult;
@ -237,7 +245,6 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, indexResponse.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), indexSourceAsBytes));
}
responses[requestIndex] = new BulkItemResponse(item.id(), "update", updateResponse);
preVersions[requestIndex] = result.preVersion;
if (result.mappingToUpdate != null) {
if (mappingsToUpdate == null) {
mappingsToUpdate = Sets.newHashSet();
@ -286,7 +293,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
if (retryPrimaryException(t)) {
// restore updated versions...
for (int j = 0; j < requestIndex; j++) {
applyVersion(request.items()[j], preVersions[j]);
applyVersion(request.items()[j], preVersions[j], preVersionTypes[j]);
}
throw (ElasticsearchException) t;
}
@ -328,6 +335,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
}
assert responses[requestIndex] != null; // we must have set a response somewhere.
assert preVersionTypes[requestIndex] != null;
}
@ -351,13 +359,11 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
static class WriteResult {
final Object response;
final long preVersion;
final Tuple<String, String> mappingToUpdate;
final Engine.IndexingOperation op;
WriteResult(Object response, long preVersion, Tuple<String, String> mappingToUpdate, Engine.IndexingOperation op) {
WriteResult(Object response, Tuple<String, String> mappingToUpdate, Engine.IndexingOperation op) {
this.response = response;
this.preVersion = preVersion;
this.mappingToUpdate = mappingToUpdate;
this.op = op;
}
@ -403,10 +409,12 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
op = create;
created = true;
}
long preVersion = indexRequest.version();
// update the version on request so it will happen on the replicas
indexRequest.versionType(indexRequest.versionType().versionTypeForReplicationAndRecovery());
indexRequest.version(version);
assert indexRequest.versionType().validateVersion(indexRequest.version());
// update mapping on master if needed, we won't update changes to the same type, since once its changed, it won't have mappers added
Tuple<String, String> mappingsToUpdate = null;
if (op.parsedDoc().mappingsModified()) {
@ -414,16 +422,20 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
}
IndexResponse indexResponse = new IndexResponse(indexRequest.index(), indexRequest.type(), indexRequest.id(), version, created);
return new WriteResult(indexResponse, preVersion, mappingsToUpdate, op);
return new WriteResult(indexResponse, mappingsToUpdate, op);
}
private WriteResult shardDeleteOperation(DeleteRequest deleteRequest, IndexShard indexShard) {
Engine.Delete delete = indexShard.prepareDelete(deleteRequest.type(), deleteRequest.id(), deleteRequest.version()).versionType(deleteRequest.versionType()).origin(Engine.Operation.Origin.PRIMARY);
indexShard.delete(delete);
// update the request with the version so it will go to the replicas
deleteRequest.versionType(delete.versionType().versionTypeForReplicationAndRecovery());
deleteRequest.version(delete.version());
assert deleteRequest.versionType().validateVersion(deleteRequest.version());
DeleteResponse deleteResponse = new DeleteResponse(deleteRequest.index(), deleteRequest.type(), deleteRequest.id(), delete.version(), delete.found());
return new WriteResult(deleteResponse, deleteRequest.version(), null, null);
return new WriteResult(deleteResponse, null, null);
}
static class UpdateResult {
@ -532,10 +544,14 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
.routing(indexRequest.routing()).parent(indexRequest.parent()).timestamp(indexRequest.timestamp()).ttl(indexRequest.ttl());
if (indexRequest.opType() == IndexRequest.OpType.INDEX) {
Engine.Index index = indexShard.prepareIndex(sourceToParse).version(indexRequest.version()).origin(Engine.Operation.Origin.REPLICA);
Engine.Index index = indexShard.prepareIndex(sourceToParse)
.version(indexRequest.version()).versionType(indexRequest.versionType())
.origin(Engine.Operation.Origin.REPLICA);
indexShard.index(index);
} else {
Engine.Create create = indexShard.prepareCreate(sourceToParse).version(indexRequest.version()).origin(Engine.Operation.Origin.REPLICA);
Engine.Create create = indexShard.prepareCreate(sourceToParse)
.version(indexRequest.version()).versionType(indexRequest.versionType())
.origin(Engine.Operation.Origin.REPLICA);
indexShard.create(create);
}
} catch (Throwable e) {
@ -544,7 +560,8 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
} else if (item.request() instanceof DeleteRequest) {
DeleteRequest deleteRequest = (DeleteRequest) item.request();
try {
Engine.Delete delete = indexShard.prepareDelete(deleteRequest.type(), deleteRequest.id(), deleteRequest.version()).origin(Engine.Operation.Origin.REPLICA);
Engine.Delete delete = indexShard.prepareDelete(deleteRequest.type(), deleteRequest.id(), deleteRequest.version())
.versionType(deleteRequest.versionType()).origin(Engine.Operation.Origin.REPLICA);
indexShard.delete(delete);
} catch (Throwable e) {
// ignore, we are on backup
@ -596,11 +613,13 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
}
}
private void applyVersion(BulkItemRequest item, long version) {
private void applyVersion(BulkItemRequest item, long version, VersionType versionType) {
if (item.request() instanceof IndexRequest) {
((IndexRequest) item.request()).version(version);
((IndexRequest) item.request()).version(version).versionType(versionType);
} else if (item.request() instanceof DeleteRequest) {
((DeleteRequest) item.request()).version(version);
((DeleteRequest) item.request()).version(version).versionType();
} else if (item.request() instanceof UpdateRequest) {
((UpdateRequest) item.request()).version(version).versionType();
} else {
// log?
}

View File

@ -94,6 +94,9 @@ public class DeleteRequest extends ShardReplicationOperationRequest<DeleteReques
if (id == null) {
validationException = addValidationError("id is missing", validationException);
}
if (!versionType.validateVersion(version)) {
validationException = addValidationError("illegal version value [" + version + "] for version type ["+ versionType.name() + "]", validationException);
}
return validationException;
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action.delete;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
@ -40,6 +41,7 @@ import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.indices.IndexAlreadyExistsException;
@ -107,6 +109,11 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct
MappingMetaData mappingMd = state.metaData().index(request.index()).mappingOrDefault(request.type());
if (mappingMd != null && mappingMd.routing().required()) {
if (request.routing() == null) {
if (request.versionType() != VersionType.INTERNAL) {
// TODO: implement this feature
throw new ElasticsearchIllegalArgumentException("routing value is required for deleting documents of type [" + request.type()
+ "] while using version_type [" + request.versionType());
}
indexDeleteAction.execute(new IndexDeleteRequest(request), new ActionListener<IndexDeleteResponse>() {
@Override
public void onResponse(IndexDeleteResponse indexDeleteResponse) {
@ -183,8 +190,11 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct
.origin(Engine.Operation.Origin.PRIMARY);
indexShard.delete(delete);
// update the request with teh version so it will go to the replicas
request.versionType(delete.versionType().versionTypeForReplicationAndRecovery());
request.version(delete.version());
assert request.versionType().validateVersion(request.version());
if (request.refresh()) {
try {
indexShard.refresh(new Engine.Refresh("refresh_flag_delete").force(false));
@ -201,7 +211,7 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct
protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) {
DeleteRequest request = shardRequest.request;
IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.request.index()).shardSafe(shardRequest.shardId);
Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version())
Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version()).versionType(request.versionType())
.origin(Engine.Operation.Origin.REPLICA);
indexShard.delete(delete);

View File

@ -30,6 +30,7 @@ import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.indices.IndicesService;
@ -117,6 +118,12 @@ public class TransportShardDeleteAction extends TransportShardReplicationOperati
IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.request.index()).shardSafe(shardRequest.shardId);
Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version())
.origin(Engine.Operation.Origin.REPLICA);
// IndexDeleteAction doesn't support version type at the moment. Hard coded for the INTERNAL version
delete.versionType(VersionType.INTERNAL.versionTypeForReplicationAndRecovery());
assert delete.versionType().validateVersion(delete.version());
indexShard.delete(delete);
if (request.refresh()) {

View File

@ -177,6 +177,9 @@ public class IndexRequest extends ShardReplicationOperationRequest<IndexRequest>
if (source == null) {
validationException = addValidationError("source is missing", validationException);
}
if (!versionType.validateVersion(version)) {
validationException = addValidationError("illegal version value [" + version + "] for version type ["+ versionType.name() + "]", validationException);
}
return validationException;
}

View File

@ -234,6 +234,9 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
// update the version on the request, so it will be used for the replicas
request.version(version);
request.versionType(request.versionType().versionTypeForReplicationAndRecovery());
assert request.versionType().validateVersion(request.version());
IndexResponse response = new IndexResponse(request.index(), request.type(), request.id(), version, created);
return new PrimaryResponse<IndexResponse, IndexRequest>(shardRequest.request, response, op);
@ -247,12 +250,12 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
.routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());
if (request.opType() == IndexRequest.OpType.INDEX) {
Engine.Index index = indexShard.prepareIndex(sourceToParse)
.version(request.version())
.version(request.version()).versionType(request.versionType())
.origin(Engine.Operation.Origin.REPLICA);
indexShard.index(index);
} else {
Engine.Create create = indexShard.prepareCreate(sourceToParse)
.version(request.version())
.version(request.version()).versionType(request.versionType())
.origin(Engine.Operation.Origin.REPLICA);
indexShard.create(create);
}

View File

@ -98,15 +98,15 @@ public class UpdateHelper extends AbstractComponent {
.refresh(request.refresh())
.replicationType(request.replicationType()).consistencyLevel(request.consistencyLevel());
indexRequest.operationThreaded(false);
if (request.versionType() == VersionType.EXTERNAL) {
// in external versioning mode, we want to create the new document using the given version.
indexRequest.version(request.version()).versionType(VersionType.EXTERNAL);
if (request.versionType() != VersionType.INTERNAL) {
// in all but the internal versioning mode, we want to create the new document using the given version.
indexRequest.version(request.version()).versionType(request.versionType());
}
return new Result(indexRequest, Operation.UPSERT, null, null);
}
long updateVersion = getResult.getVersion();
if (request.versionType() == VersionType.EXTERNAL) {
if (request.versionType() != VersionType.INTERNAL) {
updateVersion = request.version(); // remember, match_any is excluded by the conflict test
}

View File

@ -100,6 +100,10 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
validationException = addValidationError("can't provide both retry_on_conflict and a specific version", validationException);
}
if (!versionType.validateVersion(version)) {
validationException = addValidationError("illegal version value [" + version + "] for version type ["+ versionType.name() + "]", validationException);
}
if (script == null && doc == null) {
validationException = addValidationError("script or doc is missing", validationException);
}

View File

@ -26,15 +26,21 @@ import org.elasticsearch.common.lucene.uid.Versions;
*/
public enum VersionType {
INTERNAL((byte) 0) {
/**
* - always returns false if currentVersion == {@link Versions#NOT_SET}
* - always accepts expectedVersion == {@link Versions#MATCH_ANY}
* - if expectedVersion is set, always conflict if currentVersion == {@link Versions#NOT_FOUND}
*/
@Override
public boolean isVersionConflict(long currentVersion, long expectedVersion) {
return currentVersion != Versions.NOT_SET && expectedVersion != Versions.MATCH_ANY
&& (currentVersion == Versions.NOT_FOUND || currentVersion != expectedVersion);
if (currentVersion == Versions.NOT_SET) {
return false;
}
if (expectedVersion == Versions.MATCH_ANY) {
return false;
}
if (currentVersion == Versions.NOT_FOUND) {
return true;
}
if (currentVersion != expectedVersion) {
return true;
}
return false;
}
@Override
@ -42,8 +48,48 @@ public enum VersionType {
return (currentVersion == Versions.NOT_SET || currentVersion == Versions.NOT_FOUND) ? 1 : currentVersion + 1;
}
@Override
public boolean validateVersion(long version) {
// not allowing Versions.NOT_FOUND as it is not a valid input value.
return version > 0L || version == Versions.MATCH_ANY;
}
@Override
public VersionType versionTypeForReplicationAndRecovery() {
// replicas get the version from the primary after increment. The same version is stored in
// the transaction log. -> the should use the external semantics.
return EXTERNAL;
}
},
EXTERNAL((byte) 1) {
@Override
public boolean isVersionConflict(long currentVersion, long expectedVersion) {
if (currentVersion == Versions.NOT_SET) {
return false;
}
if (currentVersion == Versions.NOT_FOUND) {
return false;
}
if (expectedVersion == Versions.MATCH_ANY) {
return true;
}
if (currentVersion >= expectedVersion) {
return true;
}
return false;
}
@Override
public long updateVersion(long currentVersion, long expectedVersion) {
return expectedVersion;
}
@Override
public boolean validateVersion(long version) {
return version > 0L;
}
},
EXTERNAL_GTE((byte) 2) {
/**
* - always returns false if currentVersion == {@link Versions#NOT_SET}
* - always conflict if expectedVersion == {@link Versions#MATCH_ANY} (we need something to set)
@ -51,14 +97,63 @@ public enum VersionType {
*/
@Override
public boolean isVersionConflict(long currentVersion, long expectedVersion) {
return currentVersion != Versions.NOT_SET && currentVersion != Versions.NOT_FOUND
&& (expectedVersion == Versions.MATCH_ANY || currentVersion >= expectedVersion);
if (currentVersion == Versions.NOT_SET) {
return false;
}
if (currentVersion == Versions.NOT_FOUND) {
return false;
}
if (expectedVersion == Versions.MATCH_ANY) {
return true;
}
if (currentVersion > expectedVersion) {
return true;
}
return false;
}
@Override
public long updateVersion(long currentVersion, long expectedVersion) {
return expectedVersion;
}
@Override
public boolean validateVersion(long version) {
return version > 0L;
}
},
/**
* Warning: this version type should be used with care. Concurrent indexing may result in loss of data on replicas
*/
FORCE((byte) 3) {
/**
* - always returns false if currentVersion == {@link Versions#NOT_SET}
* - always conflict if expectedVersion == {@link Versions#MATCH_ANY} (we need something to set)
* - accepts currentVersion == {@link Versions#NOT_FOUND}
*/
@Override
public boolean isVersionConflict(long currentVersion, long expectedVersion) {
if (currentVersion == Versions.NOT_SET) {
return false;
}
if (currentVersion == Versions.NOT_FOUND) {
return false;
}
if (expectedVersion == Versions.MATCH_ANY) {
return true;
}
return false;
}
@Override
public long updateVersion(long currentVersion, long expectedVersion) {
return expectedVersion;
}
@Override
public boolean validateVersion(long version) {
return version > 0L;
}
};
private final byte value;
@ -85,11 +180,29 @@ public enum VersionType {
*/
public abstract long updateVersion(long currentVersion, long expectedVersion);
/** validate the version is a valid value for this type.
* @return true if valid, false o.w
*/
public abstract boolean validateVersion(long version);
/** Some version types require different semantics for primary and replicas. This version allows
* the type to override the default behavior.
*/
public VersionType versionTypeForReplicationAndRecovery() {
return this;
}
public static VersionType fromString(String versionType) {
if ("internal".equals(versionType)) {
return INTERNAL;
} else if ("external".equals(versionType)) {
return EXTERNAL;
} else if ("external_gt".equals(versionType)) {
return EXTERNAL;
} else if ("external_gte".equals(versionType)) {
return EXTERNAL_GTE;
} else if ("force".equals(versionType)) {
return FORCE;
}
throw new ElasticsearchIllegalArgumentException("No version type match [" + versionType + "]");
}
@ -98,12 +211,7 @@ public enum VersionType {
if (versionType == null) {
return defaultVersionType;
}
if ("internal".equals(versionType)) {
return INTERNAL;
} else if ("external".equals(versionType)) {
return EXTERNAL;
}
throw new ElasticsearchIllegalArgumentException("No version type match [" + versionType + "]");
return fromString(versionType);
}
public static VersionType fromValue(byte value) {
@ -111,6 +219,10 @@ public enum VersionType {
return INTERNAL;
} else if (value == 1) {
return EXTERNAL;
} else if (value == 2) {
return EXTERNAL_GTE;
} else if (value == 3) {
return FORCE;
}
throw new ElasticsearchIllegalArgumentException("No version type match [" + value + "]");
}

View File

@ -48,7 +48,6 @@ import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.analysis.AnalysisService;
import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
@ -417,23 +416,14 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
// same logic as index
long updatedVersion;
long expectedVersion = create.version();
if (create.origin() == Operation.Origin.PRIMARY) {
if (create.versionType().isVersionConflict(currentVersion, expectedVersion)) {
if (create.versionType().isVersionConflict(currentVersion, expectedVersion)) {
if (create.origin() == Operation.Origin.RECOVERY) {
return;
} else {
throw new VersionConflictEngineException(shardId, create.type(), create.id(), currentVersion, expectedVersion);
}
updatedVersion = create.versionType().updateVersion(currentVersion, expectedVersion);
} else { // if (index.origin() == Operation.Origin.REPLICA || index.origin() == Operation.Origin.RECOVERY) {
// replicas treat the version as "external" as it comes from the primary ->
// only exploding if the version they got is lower or equal to what they know.
if (VersionType.EXTERNAL.isVersionConflict(currentVersion, expectedVersion)) {
if (create.origin() == Operation.Origin.RECOVERY) {
return;
} else {
throw new VersionConflictEngineException(shardId, create.type(), create.id(), currentVersion, expectedVersion);
}
}
updatedVersion = VersionType.EXTERNAL.updateVersion(currentVersion, expectedVersion);
}
updatedVersion = create.versionType().updateVersion(currentVersion, expectedVersion);
// if the doc does not exists or it exists but not delete
if (versionValue != null) {
@ -513,25 +503,15 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
long updatedVersion;
long expectedVersion = index.version();
if (index.origin() == Operation.Origin.PRIMARY) {
if (index.versionType().isVersionConflict(currentVersion, expectedVersion)) {
if (index.versionType().isVersionConflict(currentVersion, expectedVersion)) {
if (index.origin() == Operation.Origin.RECOVERY) {
return;
} else {
throw new VersionConflictEngineException(shardId, index.type(), index.id(), currentVersion, expectedVersion);
}
updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion);
} else { // if (index.origin() == Operation.Origin.REPLICA || index.origin() == Operation.Origin.RECOVERY) {
// replicas treat the version as "external" as it comes from the primary ->
// only exploding if the version they got is lower or equal to what they know.
if (VersionType.EXTERNAL.isVersionConflict(currentVersion, expectedVersion)) {
if (index.origin() == Operation.Origin.RECOVERY) {
return;
} else {
throw new VersionConflictEngineException(shardId, index.type(), index.id(), currentVersion, expectedVersion);
}
}
updatedVersion = VersionType.EXTERNAL.updateVersion(currentVersion, expectedVersion);
}
updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion);
index.version(updatedVersion);
if (currentVersion == Versions.NOT_FOUND) {
@ -604,25 +584,14 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
long updatedVersion;
long expectedVersion = delete.version();
if (delete.origin() == Operation.Origin.PRIMARY) {
if (delete.versionType().isVersionConflict(currentVersion, expectedVersion)) {
if (delete.versionType().isVersionConflict(currentVersion, expectedVersion)) {
if (delete.origin() == Operation.Origin.RECOVERY) {
return;
} else {
throw new VersionConflictEngineException(shardId, delete.type(), delete.id(), currentVersion, expectedVersion);
}
updatedVersion = delete.versionType().updateVersion(currentVersion, expectedVersion);
} else { // if (index.origin() == Operation.Origin.REPLICA || index.origin() == Operation.Origin.RECOVERY) {
// replicas treat the version as "external" as it comes from the primary ->
// only exploding if the version they got is lower or equal to what they know.
if (VersionType.EXTERNAL.isVersionConflict(currentVersion, expectedVersion)) {
if (delete.origin() == Operation.Origin.RECOVERY) {
return;
} else {
throw new VersionConflictEngineException(shardId, delete.type(), delete.id(), currentVersion - 1, expectedVersion);
}
}
updatedVersion = VersionType.EXTERNAL.updateVersion(currentVersion, expectedVersion);
}
updatedVersion = delete.versionType().updateVersion(currentVersion, expectedVersion);
if (currentVersion == Versions.NOT_FOUND) {
// doc does not exists and no prior deletes

View File

@ -735,19 +735,22 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
case CREATE:
Translog.Create create = (Translog.Create) operation;
engine.create(prepareCreate(source(create.source()).type(create.type()).id(create.id())
.routing(create.routing()).parent(create.parent()).timestamp(create.timestamp()).ttl(create.ttl())).version(create.version())
.routing(create.routing()).parent(create.parent()).timestamp(create.timestamp()).ttl(create.ttl()))
.version(create.version()).versionType(create.versionType().versionTypeForReplicationAndRecovery())
.origin(Engine.Operation.Origin.RECOVERY));
break;
case SAVE:
Translog.Index index = (Translog.Index) operation;
engine.index(prepareIndex(source(index.source()).type(index.type()).id(index.id())
.routing(index.routing()).parent(index.parent()).timestamp(index.timestamp()).ttl(index.ttl())).version(index.version())
.routing(index.routing()).parent(index.parent()).timestamp(index.timestamp()).ttl(index.ttl()))
.version(index.version()).versionType(index.versionType().versionTypeForReplicationAndRecovery())
.origin(Engine.Operation.Origin.RECOVERY));
break;
case DELETE:
Translog.Delete delete = (Translog.Delete) operation;
Uid uid = Uid.createUid(delete.uid().text());
engine.delete(new Engine.Delete(uid.type(), uid.id(), delete.uid()).version(delete.version())
engine.delete(new Engine.Delete(uid.type(), uid.id(), delete.uid())
.version(delete.version()).versionType(delete.versionType().versionTypeForReplicationAndRecovery())
.origin(Engine.Operation.Origin.RECOVERY));
break;
case DELETE_BY_QUERY:

View File

@ -29,8 +29,10 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.CloseableIndexComponent;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexShardComponent;
@ -247,6 +249,8 @@ public interface Translog extends IndexShardComponent, CloseableIndexComponent {
}
static class Create implements Operation {
public static final int SERIALIZATION_FORMAT = 6;
private String id;
private String type;
private BytesReference source;
@ -254,7 +258,8 @@ public interface Translog extends IndexShardComponent, CloseableIndexComponent {
private String parent;
private long timestamp;
private long ttl;
private long version;
private long version = Versions.MATCH_ANY;
private VersionType versionType = VersionType.INTERNAL;
public Create() {
}
@ -268,6 +273,7 @@ public interface Translog extends IndexShardComponent, CloseableIndexComponent {
this.timestamp = create.timestamp();
this.ttl = create.ttl();
this.version = create.version();
this.versionType = create.versionType();
}
public Create(String type, String id, byte[] source) {
@ -318,6 +324,10 @@ public interface Translog extends IndexShardComponent, CloseableIndexComponent {
return this.version;
}
public VersionType versionType() {
return versionType;
}
@Override
public Source readSource(StreamInput in) throws IOException {
readFrom(in);
@ -349,11 +359,16 @@ public interface Translog extends IndexShardComponent, CloseableIndexComponent {
if (version >= 5) {
this.ttl = in.readLong();
}
if (version >= 6) {
this.versionType = VersionType.fromValue(in.readByte());
}
assert versionType.validateVersion(version);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(5); // version
out.writeVInt(SERIALIZATION_FORMAT);
out.writeString(id);
out.writeString(type);
out.writeBytesReference(source);
@ -372,13 +387,17 @@ public interface Translog extends IndexShardComponent, CloseableIndexComponent {
out.writeLong(version);
out.writeLong(timestamp);
out.writeLong(ttl);
out.writeByte(versionType.getValue());
}
}
static class Index implements Operation {
public static final int SERIALIZATION_FORMAT = 6;
private String id;
private String type;
private long version;
private long version = Versions.MATCH_ANY;
private VersionType versionType = VersionType.INTERNAL;
private BytesReference source;
private String routing;
private String parent;
@ -397,6 +416,7 @@ public interface Translog extends IndexShardComponent, CloseableIndexComponent {
this.version = index.version();
this.timestamp = index.timestamp();
this.ttl = index.ttl();
this.versionType = index.versionType();
}
public Index(String type, String id, byte[] source) {
@ -447,6 +467,10 @@ public interface Translog extends IndexShardComponent, CloseableIndexComponent {
return this.version;
}
public VersionType versionType() {
return versionType;
}
@Override
public Source readSource(StreamInput in) throws IOException {
readFrom(in);
@ -478,11 +502,16 @@ public interface Translog extends IndexShardComponent, CloseableIndexComponent {
if (version >= 5) {
this.ttl = in.readLong();
}
if (version >= 6) {
this.versionType = VersionType.fromValue(in.readByte());
}
assert versionType.validateVersion(version);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(5); // version
out.writeVInt(SERIALIZATION_FORMAT);
out.writeString(id);
out.writeString(type);
out.writeBytesReference(source);
@ -501,12 +530,16 @@ public interface Translog extends IndexShardComponent, CloseableIndexComponent {
out.writeLong(version);
out.writeLong(timestamp);
out.writeLong(ttl);
out.writeByte(versionType.getValue());
}
}
static class Delete implements Operation {
public static final int SERIALIZATION_FORMAT = 2;
private Term uid;
private long version;
private long version = Versions.MATCH_ANY;
private VersionType versionType = VersionType.INTERNAL;
public Delete() {
}
@ -514,6 +547,7 @@ public interface Translog extends IndexShardComponent, CloseableIndexComponent {
public Delete(Engine.Delete delete) {
this(delete.uid());
this.version = delete.version();
this.versionType = delete.versionType();
}
public Delete(Term uid) {
@ -538,6 +572,10 @@ public interface Translog extends IndexShardComponent, CloseableIndexComponent {
return this.version;
}
public VersionType versionType() {
return this.versionType;
}
@Override
public Source readSource(StreamInput in) throws IOException {
throw new ElasticsearchIllegalStateException("trying to read doc source from delete operation");
@ -550,18 +588,26 @@ public interface Translog extends IndexShardComponent, CloseableIndexComponent {
if (version >= 1) {
this.version = in.readLong();
}
if (version >= 2) {
this.versionType = VersionType.fromValue(in.readByte());
}
assert versionType.validateVersion(version);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(1); // version
out.writeVInt(SERIALIZATION_FORMAT);
out.writeString(uid.field());
out.writeString(uid.text());
out.writeLong(version);
out.writeByte(versionType.getValue());
}
}
static class DeleteByQuery implements Operation {
public static final int SERIALIZATION_FORMAT = 2;
private BytesReference source;
@Nullable
private String[] filteringAliases;
@ -637,7 +683,7 @@ public interface Translog extends IndexShardComponent, CloseableIndexComponent {
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(2); // version
out.writeVInt(SERIALIZATION_FORMAT);
out.writeBytesReference(source);
out.writeVInt(types.length);
for (String type : types) {

View File

@ -58,6 +58,25 @@ public class VersionTypeTests extends ElasticsearchTestCase {
// updatedVersion = (currentVersion == Versions.NOT_SET || currentVersion == Versions.NOT_FOUND) ? 1 : currentVersion + 1;
}
@Test
public void testVersionValidation() {
assertTrue(VersionType.EXTERNAL.validateVersion(randomIntBetween(1,Integer.MAX_VALUE)));
assertFalse(VersionType.EXTERNAL.validateVersion(0)); // MATCH_ANY
assertFalse(VersionType.EXTERNAL.validateVersion(randomIntBetween(Integer.MIN_VALUE, 0)));
assertTrue(VersionType.EXTERNAL_GTE.validateVersion(randomIntBetween(1,Integer.MAX_VALUE)));
assertFalse(VersionType.EXTERNAL_GTE.validateVersion(0)); // MATCH_ANY
assertFalse(VersionType.EXTERNAL_GTE.validateVersion(randomIntBetween(Integer.MIN_VALUE, 0)));
assertTrue(VersionType.FORCE.validateVersion(randomIntBetween(1,Integer.MAX_VALUE)));
assertFalse(VersionType.FORCE.validateVersion(0)); // MATCH_ANY
assertFalse(VersionType.FORCE.validateVersion(randomIntBetween(Integer.MIN_VALUE, 0)));
assertTrue(VersionType.INTERNAL.validateVersion(randomIntBetween(1,Integer.MAX_VALUE)));
assertTrue(VersionType.INTERNAL.validateVersion(0)); // MATCH_ANY
assertFalse(VersionType.INTERNAL.validateVersion(randomIntBetween(Integer.MIN_VALUE, 0)));
}
@Test
public void testExternalVersionConflict() throws Exception {
@ -88,6 +107,43 @@ public class VersionTypeTests extends ElasticsearchTestCase {
// updatedVersion = index.version();
}
@Test
public void testExternalGTEVersionConflict() throws Exception {
assertFalse(VersionType.EXTERNAL_GTE.isVersionConflict(Versions.NOT_FOUND, 10));
assertFalse(VersionType.EXTERNAL_GTE.isVersionConflict(Versions.NOT_SET, 10));
// MATCH_ANY must throw an exception in the case of external version, as the version must be set! it used as the new value
assertTrue(VersionType.EXTERNAL_GTE.isVersionConflict(10, Versions.MATCH_ANY));
// if we didn't find a version (but the index does support it), we always accept
assertFalse(VersionType.EXTERNAL_GTE.isVersionConflict(Versions.NOT_FOUND, Versions.NOT_FOUND));
assertFalse(VersionType.EXTERNAL_GTE.isVersionConflict(Versions.NOT_FOUND, 10));
assertFalse(VersionType.EXTERNAL_GTE.isVersionConflict(Versions.NOT_FOUND, Versions.MATCH_ANY));
// and the standard behavior
assertFalse(VersionType.EXTERNAL_GTE.isVersionConflict(10, 10));
assertFalse(VersionType.EXTERNAL_GTE.isVersionConflict(9, 10));
assertTrue(VersionType.EXTERNAL_GTE.isVersionConflict(10, 9));
}
@Test
public void testForceVersionConflict() throws Exception {
assertFalse(VersionType.FORCE.isVersionConflict(Versions.NOT_FOUND, 10));
assertFalse(VersionType.FORCE.isVersionConflict(Versions.NOT_SET, 10));
// MATCH_ANY must throw an exception in the case of external version, as the version must be set! it used as the new value
assertTrue(VersionType.FORCE.isVersionConflict(10, Versions.MATCH_ANY));
// if we didn't find a version (but the index does support it), we always accept
assertFalse(VersionType.FORCE.isVersionConflict(Versions.NOT_FOUND, Versions.NOT_FOUND));
assertFalse(VersionType.FORCE.isVersionConflict(Versions.NOT_FOUND, 10));
assertFalse(VersionType.FORCE.isVersionConflict(Versions.NOT_FOUND, Versions.MATCH_ANY));
// and the standard behavior
assertFalse(VersionType.FORCE.isVersionConflict(10, 10));
assertFalse(VersionType.FORCE.isVersionConflict(9, 10));
assertFalse(VersionType.FORCE.isVersionConflict(10, 9));
}
@Test
public void testUpdateVersion() {

View File

@ -755,7 +755,8 @@ public class InternalEngineTests extends ElasticsearchTestCase {
engine.create(create);
assertThat(create.version(), equalTo(1l));
create = new Engine.Create(null, newUid("1"), doc).version(create.version()).origin(REPLICA);
create = new Engine.Create(null, newUid("1"), doc).version(create.version())
.versionType(create.versionType().versionTypeForReplicationAndRecovery()).origin(REPLICA);
replicaEngine.create(create);
assertThat(create.version(), equalTo(1l));
}
@ -767,7 +768,8 @@ public class InternalEngineTests extends ElasticsearchTestCase {
engine.create(create);
assertThat(create.version(), equalTo(12l));
create = new Engine.Create(null, newUid("1"), doc).version(create.version()).origin(REPLICA);
create = new Engine.Create(null, newUid("1"), doc).version(create.version())
.versionType(create.versionType().versionTypeForReplicationAndRecovery()).origin(REPLICA);
replicaEngine.create(create);
assertThat(create.version(), equalTo(12l));
}
@ -779,7 +781,8 @@ public class InternalEngineTests extends ElasticsearchTestCase {
engine.index(index);
assertThat(index.version(), equalTo(1l));
index = new Engine.Index(null, newUid("1"), doc).version(index.version()).origin(REPLICA);
index = new Engine.Index(null, newUid("1"), doc).version(index.version())
.versionType(index.versionType().versionTypeForReplicationAndRecovery()).origin(REPLICA);
replicaEngine.index(index);
assertThat(index.version(), equalTo(1l));
}
@ -791,7 +794,8 @@ public class InternalEngineTests extends ElasticsearchTestCase {
engine.index(index);
assertThat(index.version(), equalTo(12l));
index = new Engine.Index(null, newUid("1"), doc).version(index.version()).origin(REPLICA);
index = new Engine.Index(null, newUid("1"), doc)
.version(index.version()).versionType(index.versionType().versionTypeForReplicationAndRecovery()).origin(REPLICA);
replicaEngine.index(index);
assertThat(index.version(), equalTo(12l));
}
@ -1052,12 +1056,13 @@ public class InternalEngineTests extends ElasticsearchTestCase {
assertThat(index.version(), equalTo(2l));
// apply the second index to the replica, should work fine
index = new Engine.Index(null, newUid("1"), doc).version(2l).origin(REPLICA);
index = new Engine.Index(null, newUid("1"), doc).version(index.version())
.versionType(VersionType.INTERNAL.versionTypeForReplicationAndRecovery()).origin(REPLICA);
replicaEngine.index(index);
assertThat(index.version(), equalTo(2l));
// now, the old one should not work
index = new Engine.Index(null, newUid("1"), doc).version(1l).origin(REPLICA);
index = new Engine.Index(null, newUid("1"), doc).version(1l).versionType(VersionType.INTERNAL.versionTypeForReplicationAndRecovery()).origin(REPLICA);
try {
replicaEngine.index(index);
fail();
@ -1067,7 +1072,8 @@ public class InternalEngineTests extends ElasticsearchTestCase {
// second version on replica should fail as well
try {
index = new Engine.Index(null, newUid("1"), doc).version(2l).origin(REPLICA);
index = new Engine.Index(null, newUid("1"), doc).version(2l)
.versionType(VersionType.INTERNAL.versionTypeForReplicationAndRecovery()).origin(REPLICA);
replicaEngine.index(index);
assertThat(index.version(), equalTo(2l));
} catch (VersionConflictEngineException e) {
@ -1083,7 +1089,8 @@ public class InternalEngineTests extends ElasticsearchTestCase {
assertThat(index.version(), equalTo(1l));
// apply the first index to the replica, should work fine
index = new Engine.Index(null, newUid("1"), doc).version(1l).origin(REPLICA);
index = new Engine.Index(null, newUid("1"), doc).version(1l)
.versionType(VersionType.INTERNAL.versionTypeForReplicationAndRecovery()).origin(REPLICA);
replicaEngine.index(index);
assertThat(index.version(), equalTo(1l));
@ -1098,24 +1105,27 @@ public class InternalEngineTests extends ElasticsearchTestCase {
assertThat(delete.version(), equalTo(3l));
// apply the delete on the replica (skipping the second index)
delete = new Engine.Delete("test", "1", newUid("1")).version(3l).origin(REPLICA);
delete = new Engine.Delete("test", "1", newUid("1")).version(3l)
.versionType(VersionType.INTERNAL.versionTypeForReplicationAndRecovery()).origin(REPLICA);
replicaEngine.delete(delete);
assertThat(delete.version(), equalTo(3l));
// second time delete with same version should fail
try {
delete = new Engine.Delete("test", "1", newUid("1")).version(3l).origin(REPLICA);
delete = new Engine.Delete("test", "1", newUid("1")).version(3l)
.versionType(VersionType.INTERNAL.versionTypeForReplicationAndRecovery()).origin(REPLICA);
replicaEngine.delete(delete);
assertThat(delete.version(), equalTo(3l));
fail("excepted VersionConflictEngineException to be thrown");
} catch (VersionConflictEngineException e) {
// all is well
}
// now do the second index on the replica, it should fail
try {
index = new Engine.Index(null, newUid("1"), doc).version(2l).origin(REPLICA);
index = new Engine.Index(null, newUid("1"), doc).version(2l)
.versionType(VersionType.INTERNAL.versionTypeForReplicationAndRecovery()).origin(REPLICA);
replicaEngine.index(index);
assertThat(index.version(), equalTo(2l));
fail("excepted VersionConflictEngineException to be thrown");
} catch (VersionConflictEngineException e) {
// all is well
}

View File

@ -61,6 +61,84 @@ public class SimpleVersioningTests extends ElasticsearchIntegrationTest {
assertThat(indexResponse.getVersion(), equalTo(18L));
}
@Test
public void testForce() throws Exception {
createIndex("test");
IndexResponse indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setVersion(12).setVersionType(VersionType.FORCE).get();
assertThat(indexResponse.getVersion(), equalTo(12l));
indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_2").setVersion(12).setVersionType(VersionType.FORCE).get();
assertThat(indexResponse.getVersion(), equalTo(12l));
indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_2").setVersion(14).setVersionType(VersionType.FORCE).get();
assertThat(indexResponse.getVersion(), equalTo(14l));
indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setVersion(13).setVersionType(VersionType.FORCE).get();
assertThat(indexResponse.getVersion(), equalTo(13l));
client().admin().indices().prepareRefresh().execute().actionGet();
if (randomBoolean()) {
refresh();
}
for (int i = 0; i < 10; i++) {
assertThat(client().prepareGet("test", "type", "1").get().getVersion(), equalTo(13l));
}
// deleting with a lower version works.
long v= randomIntBetween(12,14);
DeleteResponse deleteResponse = client().prepareDelete("test", "type", "1").setVersion(v).setVersionType(VersionType.FORCE).get();
assertThat(deleteResponse.isFound(), equalTo(true));
assertThat(deleteResponse.getVersion(), equalTo(v));
}
@Test
public void testExternalGTE() throws Exception {
createIndex("test");
IndexResponse indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setVersion(12).setVersionType(VersionType.EXTERNAL_GTE).get();
assertThat(indexResponse.getVersion(), equalTo(12l));
indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_2").setVersion(12).setVersionType(VersionType.EXTERNAL_GTE).get();
assertThat(indexResponse.getVersion(), equalTo(12l));
indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_2").setVersion(14).setVersionType(VersionType.EXTERNAL_GTE).get();
assertThat(indexResponse.getVersion(), equalTo(14l));
assertThrows(client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setVersion(13).setVersionType(VersionType.EXTERNAL_GTE),
VersionConflictEngineException.class);
client().admin().indices().prepareRefresh().execute().actionGet();
if (randomBoolean()) {
refresh();
}
for (int i = 0; i < 10; i++) {
assertThat(client().prepareGet("test", "type", "1").get().getVersion(), equalTo(14l));
}
// deleting with a lower version fails.
assertThrows(
client().prepareDelete("test", "type", "1").setVersion(2).setVersionType(VersionType.EXTERNAL_GTE),
VersionConflictEngineException.class);
// Delete with a higher or equal version deletes all versions up to the given one.
long v= randomIntBetween(14,17);
DeleteResponse deleteResponse = client().prepareDelete("test", "type", "1").setVersion(v).setVersionType(VersionType.EXTERNAL_GTE).execute().actionGet();
assertThat(deleteResponse.isFound(), equalTo(true));
assertThat(deleteResponse.getVersion(), equalTo(v));
// Deleting with a lower version keeps on failing after a delete.
assertThrows(
client().prepareDelete("test", "type", "1").setVersion(2).setVersionType(VersionType.EXTERNAL_GTE).execute(),
VersionConflictEngineException.class);
// But delete with a higher version is OK.
deleteResponse = client().prepareDelete("test", "type", "1").setVersion(18).setVersionType(VersionType.EXTERNAL_GTE).execute().actionGet();
assertThat(deleteResponse.isFound(), equalTo(false));
assertThat(deleteResponse.getVersion(), equalTo(18l));
}
@Test
public void testExternalVersioning() throws Exception {
createIndex("test");
@ -75,7 +153,9 @@ public class SimpleVersioningTests extends ElasticsearchIntegrationTest {
assertThrows(client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setVersion(13).setVersionType(VersionType.EXTERNAL).execute(),
VersionConflictEngineException.class);
client().admin().indices().prepareRefresh().execute().actionGet();
if (randomBoolean()) {
refresh();
}
for (int i = 0; i < 10; i++) {
assertThat(client().prepareGet("test", "type", "1").execute().actionGet().getVersion(), equalTo(14l));
}