Allow 0 as a valid external version

Until now all version types have officially required the version to be a positive long number. Despite of this has being documented, ES versions <=1.0 did not enforce it when using the `external` version type. As a result people have succesfully indexed documents with 0 as a version. In 1.1. we introduced validation checks on incoming version values and causing indexing request to fail if the version was set to 0. While this is strictly speaking OK, we effectively have a situation where data already indexed does not match the version invariant.

To be lenient and adhere to spirit of our data backward compatibility policy, we have decided to allow 0 as a valid external version type. This is somewhat complicated as 0 is also the internal value of `MATCH_ANY`, which indicates requests should succeed regardles off the current doc version. To keep things simple, this commit changes the internal value of `MATCH_ANY` to `-3` for all version types.

Since we're doing this in a minor release (and because versions are stored in the transaction log), the default `internal` version type still accepts 0 as a `MATCH_ANY` value. This is not a problem for other version types as `MATCH_ANY` doesn't make sense in that context.

Closes #5662
This commit is contained in:
Boaz Leskes 2014-05-01 13:12:00 +02:00
parent f510e25306
commit 9f10547f4b
17 changed files with 140 additions and 42 deletions

View File

@ -109,7 +109,7 @@ By default, internal versioning is used that starts at 1 and increments
with each update, deletes included. Optionally, the version number can be
supplemented with an external value (for example, if maintained in a
database). To enable this functionality, `version_type` should be set to
`external`. The value provided must be a numeric, long value greater than 0,
`external`. The value provided must be a numeric, long value greater or equal to 0,
and less than around 9.2e+18. When using the external version type, instead
of checking for a matching version number, the system checks to see if
the version number passed to the index request is greater than the
@ -138,12 +138,13 @@ of the stored document.
`external` or `external_gt`:: only index the document if the given version is strictly higher
than the version of the stored document *or* if there is no existing document. The given
version will be used as the new version and will be stored with the new document.
version will be used as the new version and will be stored with the new document. The supplied
version must be a non-negative long number.
`external_gte`:: only index the document if the given version is *equal* or higher
than the version of the stored document. If there is no existing document
the operation will succeed as well. The given version will be used as the new version
and will be stored with the new document.
and will be stored with the new document. The supplied version must be a non-negative long number.
`force`:: the document will be indexed regardless of the version of the stored document or if there
is no existing document. The given version will be used as the new version and will be stored

View File

@ -1,6 +1,17 @@
---
"External version":
- do:
index:
index: test_1
type: test
id: 1
body: { foo: bar }
version_type: external
version: 0
- match: { _version: 0 }
- do:
index:
index: test_1
@ -10,7 +21,7 @@
version_type: external
version: 5
- match: { _version: 5}
- match: { _version: 5 }
- do:
catch: conflict
@ -22,6 +33,16 @@
version_type: external
version: 5
- do:
catch: conflict
index:
index: test_1
type: test
id: 1
body: { foo: bar }
version_type: external
version: 0
- do:
index:
index: test_1

View File

@ -1,6 +1,17 @@
---
"External GTE version":
- do:
index:
index: test_1
type: test
id: 1
body: { foo: bar }
version_type: external_gte
version: 0
- match: { _version: 0}
- do:
index:
index: test_1
@ -20,7 +31,7 @@
id: 1
body: { foo: bar }
version_type: external_gte
version: 4
version: 0
- do:
index:

View File

@ -24,6 +24,7 @@ import org.elasticsearch.action.support.replication.ShardReplicationOperationReq
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.index.VersionType;
import java.io.IOException;
@ -205,7 +206,7 @@ public class DeleteRequest extends ShardReplicationOperationRequest<DeleteReques
id = in.readString();
routing = in.readOptionalString();
refresh = in.readBoolean();
version = in.readLong();
version = Versions.readVersion(in);
versionType = VersionType.fromValue(in.readByte());
}
@ -216,7 +217,7 @@ public class DeleteRequest extends ShardReplicationOperationRequest<DeleteReques
out.writeString(id);
out.writeOptionalString(routing());
out.writeBoolean(refresh);
out.writeLong(version);
Versions.writeVersion(version, out);
out.writeByte(versionType.getValue());
}

View File

@ -112,7 +112,7 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct
if (request.versionType() != VersionType.INTERNAL) {
// TODO: implement this feature
throw new ElasticsearchIllegalArgumentException("routing value is required for deleting documents of type [" + request.type()
+ "] while using version_type [" + request.versionType());
+ "] while using version_type [" + request.versionType() + "]");
}
indexDeleteAction.execute(new IndexDeleteRequest(request), new ActionListener<IndexDeleteResponse>() {
@Override

View File

@ -23,6 +23,7 @@ import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.support.replication.IndexReplicationOperationRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lucene.uid.Versions;
import java.io.IOException;
@ -72,7 +73,7 @@ public class IndexDeleteRequest extends IndexReplicationOperationRequest<IndexDe
type = in.readString();
id = in.readString();
refresh = in.readBoolean();
version = in.readLong();
version = Versions.readVersion(in);
}
@Override
@ -81,6 +82,6 @@ public class IndexDeleteRequest extends IndexReplicationOperationRequest<IndexDe
out.writeString(type);
out.writeString(id);
out.writeBoolean(refresh);
out.writeLong(version);
Versions.writeVersion(version, out);
}
}

View File

@ -23,6 +23,7 @@ import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.replication.ShardReplicationOperationRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lucene.uid.Versions;
import java.io.IOException;
@ -98,7 +99,7 @@ public class ShardDeleteRequest extends ShardReplicationOperationRequest<ShardDe
type = in.readString();
id = in.readString();
refresh = in.readBoolean();
version = in.readLong();
version = Versions.readVersion(in);
}
@Override
@ -108,6 +109,6 @@ public class ShardDeleteRequest extends ShardReplicationOperationRequest<ShardDe
out.writeString(type);
out.writeString(id);
out.writeBoolean(refresh);
out.writeLong(version);
Versions.writeVersion(version, out);
}
}

View File

@ -267,7 +267,7 @@ public class GetRequest extends SingleShardOperationRequest<GetRequest> {
}
this.versionType = VersionType.fromValue(in.readByte());
this.version = in.readVLong();
this.version = Versions.readVersionWithVLongForBW(in);
fetchSourceContext = FetchSourceContext.optionalReadFromStream(in);
}
@ -298,7 +298,7 @@ public class GetRequest extends SingleShardOperationRequest<GetRequest> {
}
out.writeByte(versionType.getValue());
out.writeVLong(version);
Versions.writeVersionWithVLongForBW(version, out);
FetchSourceContext.optionalWriteToStream(fetchSourceContext, out);
}

View File

@ -169,7 +169,7 @@ public class MultiGetRequest extends ActionRequest<MultiGetRequest> implements I
fields[i] = in.readString();
}
}
version = in.readVLong();
version = Versions.readVersionWithVLongForBW(in);
versionType = VersionType.fromValue(in.readByte());
fetchSourceContext = FetchSourceContext.optionalReadFromStream(in);
@ -190,7 +190,7 @@ public class MultiGetRequest extends ActionRequest<MultiGetRequest> implements I
}
}
out.writeVLong(version);
Versions.writeVersionWithVLongForBW(version, out);
out.writeByte(versionType.getValue());
FetchSourceContext.optionalWriteToStream(fetchSourceContext, out);

View File

@ -25,6 +25,7 @@ import org.elasticsearch.action.support.single.shard.SingleShardOperationRequest
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.search.fetch.source.FetchSourceContext;
@ -138,7 +139,7 @@ public class MultiGetShardRequest extends SingleShardOperationRequest<MultiGetSh
} else {
fields.add(null);
}
versions.add(in.readVLong());
versions.add(Versions.readVersionWithVLongForBW(in));
versionTypes.add(VersionType.fromValue(in.readByte()));
fetchSourceContexts.add(FetchSourceContext.optionalReadFromStream(in));
@ -175,7 +176,7 @@ public class MultiGetShardRequest extends SingleShardOperationRequest<MultiGetSh
out.writeString(field);
}
}
out.writeVLong(versions.get(i));
Versions.writeVersionWithVLongForBW(versions.get(i), out);
out.writeByte(versionTypes.get(i).getValue());
FetchSourceContext fetchSourceContext = fetchSourceContexts.get(i);
FetchSourceContext.optionalWriteToStream(fetchSourceContext, out);

View File

@ -632,7 +632,7 @@ public class IndexRequest extends ShardReplicationOperationRequest<IndexRequest>
opType = OpType.fromId(in.readByte());
refresh = in.readBoolean();
version = in.readLong();
version = Versions.readVersion(in);
versionType = VersionType.fromValue(in.readByte());
if (in.getVersion().onOrAfter(Version.V_1_2_0)) {
autoGeneratedId = in.readBoolean();
@ -651,7 +651,7 @@ public class IndexRequest extends ShardReplicationOperationRequest<IndexRequest>
out.writeBytesReference(source);
out.writeByte(opType.id());
out.writeBoolean(refresh);
out.writeLong(version);
Versions.writeVersion(version, out);
out.writeByte(versionType.getValue());
if (out.getVersion().onOrAfter(Version.V_1_2_0)) {
out.writeBoolean(autoGeneratedId);

View File

@ -609,7 +609,7 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
upsertRequest.readFrom(in);
}
docAsUpsert = in.readBoolean();
version = in.readLong();
version = Versions.readVersion(in);
versionType = VersionType.fromValue(in.readByte());
}
@ -655,7 +655,7 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
upsertRequest.writeTo(out);
}
out.writeBoolean(docAsUpsert);
out.writeLong(version);
Versions.writeVersion(version, out);
out.writeByte(versionType.getValue());
}

View File

@ -22,7 +22,10 @@ package org.elasticsearch.common.lucene.uid;
import org.apache.lucene.index.*;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.Version;
import org.elasticsearch.common.Numbers;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
import org.elasticsearch.index.mapper.internal.VersionFieldMapper;
@ -32,11 +35,55 @@ import java.util.List;
/** Utility class to resolve the Lucene doc ID and version for a given uid. */
public class Versions {
public static final long MATCH_ANY = 0L; // Version was not specified by the user
public static final long MATCH_ANY = -3L; // Version was not specified by the user
// the value for MATCH_ANY before ES 1.2.0 - will be removed
public static final long MATCH_ANY_PRE_1_2_0 = 0L;
public static final long NOT_FOUND = -1L;
public static final long NOT_SET = -2L;
private Versions() {}
public static void writeVersion(long version, StreamOutput out) throws IOException {
if (out.getVersion().before(Version.V_1_2_0) && version == MATCH_ANY) {
// we have to send out a value the node will understand
version = MATCH_ANY_PRE_1_2_0;
}
out.writeLong(version);
}
public static long readVersion(StreamInput in) throws IOException {
long version = in.readLong();
if (in.getVersion().before(Version.V_1_2_0) && version == MATCH_ANY_PRE_1_2_0) {
version = MATCH_ANY;
}
return version;
}
public static void writeVersionWithVLongForBW(long version, StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_1_2_0)) {
out.writeLong(version);
return;
}
if (version == MATCH_ANY) {
// we have to send out a value the node will understand
version = MATCH_ANY_PRE_1_2_0;
}
out.writeVLong(version);
}
public static long readVersionWithVLongForBW(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(Version.V_1_2_0)) {
return in.readLong();
} else {
long version = in.readVLong();
if (version == MATCH_ANY_PRE_1_2_0) {
return MATCH_ANY;
}
return version;
}
}
private Versions() {
}
/** Wraps an {@link AtomicReaderContext}, a doc ID <b>relative to the context doc base</b> and a version. */
public static class DocIdAndVersion {

View File

@ -40,7 +40,8 @@ public enum VersionType {
if (currentVersion == Versions.NOT_SET) {
return false;
}
if (expectedVersion == Versions.MATCH_ANY) {
// we need to allow pre 1.2.0 match any as requests can come in for java code where the may be hardcoded
if (expectedVersion == Versions.MATCH_ANY || expectedVersion == Versions.MATCH_ANY_PRE_1_2_0) {
return false;
}
if (currentVersion == Versions.NOT_FOUND) {
@ -60,13 +61,13 @@ public enum VersionType {
@Override
public boolean validateVersionForWrites(long version) {
// not allowing Versions.NOT_FOUND as it is not a valid input value.
return version > 0L || version == Versions.MATCH_ANY;
return version > 0L || version == Versions.MATCH_ANY || version == Versions.MATCH_ANY_PRE_1_2_0;
}
@Override
public boolean validateVersionForReads(long version) {
// not allowing Versions.NOT_FOUND as it is not a valid input value.
return version > 0L || version == Versions.MATCH_ANY;
return version > 0L || version == Versions.MATCH_ANY || version == Versions.MATCH_ANY_PRE_1_2_0;
}
@Override
@ -118,12 +119,12 @@ public enum VersionType {
@Override
public boolean validateVersionForWrites(long version) {
return version > 0L;
return version >= 0L;
}
@Override
public boolean validateVersionForReads(long version) {
return version > 0L || version == Versions.MATCH_ANY;
return version >= 0L || version == Versions.MATCH_ANY;
}
},
@ -169,12 +170,12 @@ public enum VersionType {
@Override
public boolean validateVersionForWrites(long version) {
return version > 0L;
return version >= 0L;
}
@Override
public boolean validateVersionForReads(long version) {
return version > 0L || version == Versions.MATCH_ANY;
return version >= 0L || version == Versions.MATCH_ANY;
}
},
@ -208,12 +209,12 @@ public enum VersionType {
@Override
public boolean validateVersionForWrites(long version) {
return version > 0L;
return version >= 0L;
}
@Override
public boolean validateVersionForReads(long version) {
return version > 0L || version == Versions.MATCH_ANY;
return version >= 0L || version == Versions.MATCH_ANY;
}
};

View File

@ -351,7 +351,7 @@ public interface Translog extends IndexShardComponent, CloseableIndexComponent {
}
}
if (version >= 3) {
this.version = in.readLong();
this.version = Versions.readVersion(in);
}
if (version >= 4) {
this.timestamp = in.readLong();
@ -384,7 +384,7 @@ public interface Translog extends IndexShardComponent, CloseableIndexComponent {
out.writeBoolean(true);
out.writeString(parent);
}
out.writeLong(version);
Versions.writeVersion(version, out);
out.writeLong(timestamp);
out.writeLong(ttl);
out.writeByte(versionType.getValue());
@ -494,7 +494,7 @@ public interface Translog extends IndexShardComponent, CloseableIndexComponent {
}
}
if (version >= 3) {
this.version = in.readLong();
this.version = Versions.readVersion(in);
}
if (version >= 4) {
this.timestamp = in.readLong();
@ -527,7 +527,7 @@ public interface Translog extends IndexShardComponent, CloseableIndexComponent {
out.writeBoolean(true);
out.writeString(parent);
}
out.writeLong(version);
Versions.writeVersion(version, out);
out.writeLong(timestamp);
out.writeLong(ttl);
out.writeByte(versionType.getValue());
@ -586,7 +586,7 @@ public interface Translog extends IndexShardComponent, CloseableIndexComponent {
int version = in.readVInt(); // version
uid = new Term(in.readString(), in.readString());
if (version >= 1) {
this.version = in.readLong();
this.version = Versions.readVersion(in);
}
if (version >= 2) {
this.versionType = VersionType.fromValue(in.readByte());
@ -600,7 +600,7 @@ public interface Translog extends IndexShardComponent, CloseableIndexComponent {
out.writeVInt(SERIALIZATION_FORMAT);
out.writeString(uid.field());
out.writeString(uid.text());
out.writeLong(version);
Versions.writeVersion(version, out);
out.writeByte(versionType.getValue());
}
}

View File

@ -25,6 +25,7 @@ import org.elasticsearch.action.support.QuerySourceBuilder;
import org.elasticsearch.action.support.broadcast.BroadcastOperationResponse;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.index.query.QueryBuilders;
@ -40,13 +41,13 @@ public class RestActions {
public static long parseVersion(RestRequest request) {
if (request.hasParam("version")) {
return request.paramAsLong("version", 0);
return request.paramAsLong("version", Versions.MATCH_ANY);
}
String ifMatch = request.header("If-Match");
if (ifMatch != null) {
return Long.parseLong(ifMatch);
}
return 0;
return Versions.MATCH_ANY;
}
static final class Fields {

View File

@ -30,19 +30,31 @@ public class VersionTypeTests extends ElasticsearchTestCase {
public void testInternalVersionConflict() throws Exception {
assertFalse(VersionType.INTERNAL.isVersionConflictForWrites(10, Versions.MATCH_ANY));
assertFalse(VersionType.INTERNAL.isVersionConflictForReads(10, Versions.MATCH_ANY));
// if we don't have a version in the index we accept everything
assertFalse(VersionType.INTERNAL.isVersionConflictForWrites(Versions.NOT_SET, 10));
assertFalse(VersionType.INTERNAL.isVersionConflictForReads(Versions.NOT_SET, 10));
assertFalse(VersionType.INTERNAL.isVersionConflictForWrites(Versions.NOT_SET, Versions.MATCH_ANY));
assertFalse(VersionType.INTERNAL.isVersionConflictForReads(Versions.NOT_SET, Versions.MATCH_ANY));
// if we didn't find a version (but the index does support it), we don't like it unless MATCH_ANY
assertTrue(VersionType.INTERNAL.isVersionConflictForWrites(Versions.NOT_FOUND, Versions.NOT_FOUND));
assertTrue(VersionType.INTERNAL.isVersionConflictForWrites(Versions.NOT_FOUND, 10));
assertTrue(VersionType.INTERNAL.isVersionConflictForReads(Versions.NOT_FOUND, 10));
assertFalse(VersionType.INTERNAL.isVersionConflictForWrites(Versions.NOT_FOUND, Versions.MATCH_ANY));
assertFalse(VersionType.INTERNAL.isVersionConflictForReads(Versions.NOT_FOUND, Versions.MATCH_ANY));
// test 0 is still matching any for backwards compatibility with versions <1.2.0
assertFalse(VersionType.INTERNAL.isVersionConflictForWrites(10, Versions.MATCH_ANY_PRE_1_2_0));
assertFalse(VersionType.INTERNAL.isVersionConflictForReads(10, Versions.MATCH_ANY_PRE_1_2_0));
// and the stupid usual case
assertFalse(VersionType.INTERNAL.isVersionConflictForWrites(10, 10));
assertFalse(VersionType.INTERNAL.isVersionConflictForReads(10, 10));
assertTrue(VersionType.INTERNAL.isVersionConflictForWrites(9, 10));
assertTrue(VersionType.INTERNAL.isVersionConflictForReads(9, 10));
assertTrue(VersionType.INTERNAL.isVersionConflictForWrites(10, 9));
assertTrue(VersionType.INTERNAL.isVersionConflictForReads(10, 9));
// Old indexing code, dictating behavior
// if (expectedVersion != Versions.MATCH_ANY && currentVersion != Versions.NOT_SET) {