parent
e57720e091
commit
a7827b8ccd
|
@ -141,7 +141,12 @@ public class UpdateHelper extends AbstractComponent {
|
||||||
return new Result(indexRequest, DocWriteResponse.Result.CREATED, null, null);
|
return new Result(indexRequest, DocWriteResponse.Result.CREATED, null, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
final long updateVersion = getResult.getVersion();
|
long updateVersion = getResult.getVersion();
|
||||||
|
|
||||||
|
if (request.versionType() != VersionType.INTERNAL) {
|
||||||
|
assert request.versionType() == VersionType.FORCE;
|
||||||
|
updateVersion = request.version(); // remember, match_any is excluded by the conflict test
|
||||||
|
}
|
||||||
|
|
||||||
if (getResult.internalSourceRef() == null) {
|
if (getResult.internalSourceRef() == null) {
|
||||||
// no source, we can't do nothing, through a failure...
|
// no source, we can't do nothing, through a failure...
|
||||||
|
|
|
@ -110,9 +110,8 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
|
||||||
validationException = addValidationError("id is missing", validationException);
|
validationException = addValidationError("id is missing", validationException);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (versionType != VersionType.INTERNAL) {
|
if (!(versionType == VersionType.INTERNAL || versionType == VersionType.FORCE)) {
|
||||||
validationException = addValidationError("version type [" + versionType + "] is not supported by the update API",
|
validationException = addValidationError("version type [" + versionType + "] is not supported by the update API", validationException);
|
||||||
validationException);
|
|
||||||
} else {
|
} else {
|
||||||
|
|
||||||
if (version != Versions.MATCH_ANY && retryOnConflict > 0) {
|
if (version != Versions.MATCH_ANY && retryOnConflict > 0) {
|
||||||
|
|
|
@ -195,6 +195,52 @@ public enum VersionType implements Writeable {
|
||||||
return version >= 0L || version == Versions.MATCH_ANY;
|
return version >= 0L || version == Versions.MATCH_ANY;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
},
|
||||||
|
/**
|
||||||
|
* Warning: this version type should be used with care. Concurrent indexing may result in loss of data on replicas
|
||||||
|
*/
|
||||||
|
FORCE((byte) 3) {
|
||||||
|
@Override
|
||||||
|
public boolean isVersionConflictForWrites(long currentVersion, long expectedVersion, boolean deleted) {
|
||||||
|
if (currentVersion == Versions.NOT_FOUND) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (expectedVersion == Versions.MATCH_ANY) {
|
||||||
|
throw new IllegalStateException("you must specify a version when use VersionType.FORCE");
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String explainConflictForWrites(long currentVersion, long expectedVersion, boolean deleted) {
|
||||||
|
throw new AssertionError("VersionType.FORCE should never result in a write conflict");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isVersionConflictForReads(long currentVersion, long expectedVersion) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String explainConflictForReads(long currentVersion, long expectedVersion) {
|
||||||
|
throw new AssertionError("VersionType.FORCE should never result in a read conflict");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long updateVersion(long currentVersion, long expectedVersion) {
|
||||||
|
return expectedVersion;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean validateVersionForWrites(long version) {
|
||||||
|
return version >= 0L;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean validateVersionForReads(long version) {
|
||||||
|
return version >= 0L || version == Versions.MATCH_ANY;
|
||||||
|
}
|
||||||
|
|
||||||
};
|
};
|
||||||
|
|
||||||
private final byte value;
|
private final byte value;
|
||||||
|
@ -288,6 +334,8 @@ public enum VersionType implements Writeable {
|
||||||
return EXTERNAL;
|
return EXTERNAL;
|
||||||
} else if ("external_gte".equals(versionType)) {
|
} else if ("external_gte".equals(versionType)) {
|
||||||
return EXTERNAL_GTE;
|
return EXTERNAL_GTE;
|
||||||
|
} else if ("force".equals(versionType)) {
|
||||||
|
return FORCE;
|
||||||
}
|
}
|
||||||
throw new IllegalArgumentException("No version type match [" + versionType + "]");
|
throw new IllegalArgumentException("No version type match [" + versionType + "]");
|
||||||
}
|
}
|
||||||
|
@ -306,6 +354,8 @@ public enum VersionType implements Writeable {
|
||||||
return EXTERNAL;
|
return EXTERNAL;
|
||||||
} else if (value == 2) {
|
} else if (value == 2) {
|
||||||
return EXTERNAL_GTE;
|
return EXTERNAL_GTE;
|
||||||
|
} else if (value == 3) {
|
||||||
|
return FORCE;
|
||||||
}
|
}
|
||||||
throw new IllegalArgumentException("No version type match [" + value + "]");
|
throw new IllegalArgumentException("No version type match [" + value + "]");
|
||||||
}
|
}
|
||||||
|
|
|
@ -236,11 +236,14 @@ public class BulkWithUpdatesIT extends ESIntegTestCase {
|
||||||
.add(client().prepareUpdate("test", "type", "e1")
|
.add(client().prepareUpdate("test", "type", "e1")
|
||||||
.setDoc("field", "2").setVersion(10)) // INTERNAL
|
.setDoc("field", "2").setVersion(10)) // INTERNAL
|
||||||
.add(client().prepareUpdate("test", "type", "e1")
|
.add(client().prepareUpdate("test", "type", "e1")
|
||||||
.setDoc("field", "3").setVersion(13).setVersionType(VersionType.INTERNAL))
|
.setDoc("field", "3").setVersion(20).setVersionType(VersionType.FORCE))
|
||||||
|
.add(client().prepareUpdate("test", "type", "e1")
|
||||||
|
.setDoc("field", "4").setVersion(20).setVersionType(VersionType.INTERNAL))
|
||||||
.get();
|
.get();
|
||||||
|
|
||||||
assertThat(bulkResponse.getItems()[0].getFailureMessage(), containsString("version conflict"));
|
assertThat(bulkResponse.getItems()[0].getFailureMessage(), containsString("version conflict"));
|
||||||
assertThat(bulkResponse.getItems()[1].getFailureMessage(), containsString("version conflict"));
|
assertThat(bulkResponse.getItems()[1].getResponse().getVersion(), equalTo(20L));
|
||||||
|
assertThat(bulkResponse.getItems()[2].getResponse().getVersion(), equalTo(21L));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testBulkUpdateMalformedScripts() throws Exception {
|
public void testBulkUpdateMalformedScripts() throws Exception {
|
||||||
|
|
|
@ -77,6 +77,13 @@ public class VersionTypeTests extends ESTestCase {
|
||||||
assertTrue(VersionType.EXTERNAL_GTE.validateVersionForReads(randomIntBetween(1, Integer.MAX_VALUE)));
|
assertTrue(VersionType.EXTERNAL_GTE.validateVersionForReads(randomIntBetween(1, Integer.MAX_VALUE)));
|
||||||
assertFalse(VersionType.EXTERNAL_GTE.validateVersionForReads(randomIntBetween(Integer.MIN_VALUE, -1)));
|
assertFalse(VersionType.EXTERNAL_GTE.validateVersionForReads(randomIntBetween(Integer.MIN_VALUE, -1)));
|
||||||
|
|
||||||
|
assertTrue(VersionType.FORCE.validateVersionForWrites(randomIntBetween(1, Integer.MAX_VALUE)));
|
||||||
|
assertFalse(VersionType.FORCE.validateVersionForWrites(Versions.MATCH_ANY));
|
||||||
|
assertFalse(VersionType.FORCE.validateVersionForWrites(randomIntBetween(Integer.MIN_VALUE, 0)));
|
||||||
|
assertTrue(VersionType.FORCE.validateVersionForReads(Versions.MATCH_ANY));
|
||||||
|
assertTrue(VersionType.FORCE.validateVersionForReads(randomIntBetween(1, Integer.MAX_VALUE)));
|
||||||
|
assertFalse(VersionType.FORCE.validateVersionForReads(randomIntBetween(Integer.MIN_VALUE, -1)));
|
||||||
|
|
||||||
assertTrue(VersionType.INTERNAL.validateVersionForWrites(randomIntBetween(1, Integer.MAX_VALUE)));
|
assertTrue(VersionType.INTERNAL.validateVersionForWrites(randomIntBetween(1, Integer.MAX_VALUE)));
|
||||||
assertTrue(VersionType.INTERNAL.validateVersionForWrites(Versions.MATCH_ANY));
|
assertTrue(VersionType.INTERNAL.validateVersionForWrites(Versions.MATCH_ANY));
|
||||||
assertFalse(VersionType.INTERNAL.validateVersionForWrites(randomIntBetween(Integer.MIN_VALUE, 0)));
|
assertFalse(VersionType.INTERNAL.validateVersionForWrites(randomIntBetween(Integer.MIN_VALUE, 0)));
|
||||||
|
@ -146,6 +153,36 @@ public class VersionTypeTests extends ESTestCase {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testForceVersionConflict() throws Exception {
|
||||||
|
assertFalse(VersionType.FORCE.isVersionConflictForWrites(Versions.NOT_FOUND, 10, randomBoolean()));
|
||||||
|
|
||||||
|
// MATCH_ANY must throw an exception in the case of force version, as the version must be set! it used as the new value
|
||||||
|
try {
|
||||||
|
VersionType.FORCE.isVersionConflictForWrites(10, Versions.MATCH_ANY, randomBoolean());
|
||||||
|
fail();
|
||||||
|
} catch (IllegalStateException e) {
|
||||||
|
//yes!!
|
||||||
|
}
|
||||||
|
|
||||||
|
// if we didn't find a version (but the index does support it), we always accept
|
||||||
|
assertFalse(VersionType.FORCE.isVersionConflictForWrites(Versions.NOT_FOUND, Versions.NOT_FOUND, randomBoolean()));
|
||||||
|
assertFalse(VersionType.FORCE.isVersionConflictForWrites(Versions.NOT_FOUND, 10, randomBoolean()));
|
||||||
|
|
||||||
|
assertFalse(VersionType.FORCE.isVersionConflictForReads(Versions.NOT_FOUND, Versions.NOT_FOUND));
|
||||||
|
assertFalse(VersionType.FORCE.isVersionConflictForReads(Versions.NOT_FOUND, 10));
|
||||||
|
assertFalse(VersionType.FORCE.isVersionConflictForReads(Versions.NOT_FOUND, Versions.MATCH_ANY));
|
||||||
|
|
||||||
|
|
||||||
|
// and the standard behavior
|
||||||
|
assertFalse(VersionType.FORCE.isVersionConflictForWrites(10, 10, randomBoolean()));
|
||||||
|
assertFalse(VersionType.FORCE.isVersionConflictForWrites(9, 10, randomBoolean()));
|
||||||
|
assertFalse(VersionType.FORCE.isVersionConflictForWrites(10, 9, randomBoolean()));
|
||||||
|
assertFalse(VersionType.FORCE.isVersionConflictForReads(10, 10));
|
||||||
|
assertFalse(VersionType.FORCE.isVersionConflictForReads(9, 10));
|
||||||
|
assertFalse(VersionType.FORCE.isVersionConflictForReads(10, 9));
|
||||||
|
assertFalse(VersionType.FORCE.isVersionConflictForReads(10, Versions.MATCH_ANY));
|
||||||
|
}
|
||||||
|
|
||||||
public void testUpdateVersion() {
|
public void testUpdateVersion() {
|
||||||
assertThat(VersionType.INTERNAL.updateVersion(Versions.NOT_FOUND, 10), equalTo(1L));
|
assertThat(VersionType.INTERNAL.updateVersion(Versions.NOT_FOUND, 10), equalTo(1L));
|
||||||
assertThat(VersionType.INTERNAL.updateVersion(1, 1), equalTo(2L));
|
assertThat(VersionType.INTERNAL.updateVersion(1, 1), equalTo(2L));
|
||||||
|
@ -159,6 +196,9 @@ public class VersionTypeTests extends ESTestCase {
|
||||||
assertThat(VersionType.EXTERNAL_GTE.updateVersion(1, 10), equalTo(10L));
|
assertThat(VersionType.EXTERNAL_GTE.updateVersion(1, 10), equalTo(10L));
|
||||||
assertThat(VersionType.EXTERNAL_GTE.updateVersion(10, 10), equalTo(10L));
|
assertThat(VersionType.EXTERNAL_GTE.updateVersion(10, 10), equalTo(10L));
|
||||||
|
|
||||||
|
assertThat(VersionType.FORCE.updateVersion(Versions.NOT_FOUND, 10), equalTo(10L));
|
||||||
|
assertThat(VersionType.FORCE.updateVersion(11, 10), equalTo(10L));
|
||||||
|
|
||||||
// Old indexing code
|
// Old indexing code
|
||||||
// if (index.versionType() == VersionType.INTERNAL) { // internal version type
|
// if (index.versionType() == VersionType.INTERNAL) { // internal version type
|
||||||
// updatedVersion = (currentVersion == Versions.NOT_SET || currentVersion == Versions.NOT_FOUND) ? 1 : currentVersion + 1;
|
// updatedVersion = (currentVersion == Versions.NOT_SET || currentVersion == Versions.NOT_FOUND) ? 1 : currentVersion + 1;
|
||||||
|
|
|
@ -527,9 +527,15 @@ public class UpdateIT extends ESIntegTestCase {
|
||||||
.setVersionType(VersionType.EXTERNAL).execute(),
|
.setVersionType(VersionType.EXTERNAL).execute(),
|
||||||
ActionRequestValidationException.class);
|
ActionRequestValidationException.class);
|
||||||
|
|
||||||
|
|
||||||
|
// With force version
|
||||||
|
client().prepareUpdate(indexOrAlias(), "type", "2")
|
||||||
|
.setScript(new Script("", ScriptService.ScriptType.INLINE, "put_values", Collections.singletonMap("text", "v10")))
|
||||||
|
.setVersion(10).setVersionType(VersionType.FORCE).get();
|
||||||
|
|
||||||
GetResponse get = get("test", "type", "2");
|
GetResponse get = get("test", "type", "2");
|
||||||
assertThat(get.getVersion(), equalTo(10L));
|
assertThat(get.getVersion(), equalTo(10L));
|
||||||
assertThat((String) get.getSource().get("text"), equalTo("value"));
|
assertThat((String) get.getSource().get("text"), equalTo("v10"));
|
||||||
|
|
||||||
// upserts - the combination with versions is a bit weird. Test are here to ensure we do not change our behavior unintentionally
|
// upserts - the combination with versions is a bit weird. Test are here to ensure we do not change our behavior unintentionally
|
||||||
|
|
||||||
|
|
|
@ -68,6 +68,36 @@ public class SimpleVersioningIT extends ESIntegTestCase {
|
||||||
assertThat(indexResponse.getVersion(), equalTo(18L));
|
assertThat(indexResponse.getVersion(), equalTo(18L));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testForce() throws Exception {
|
||||||
|
createIndex("test");
|
||||||
|
ensureGreen("test"); // we are testing force here which doesn't work if we are recovering at the same time - zzzzz...
|
||||||
|
IndexResponse indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setVersion(12).setVersionType(VersionType.FORCE).get();
|
||||||
|
assertThat(indexResponse.getVersion(), equalTo(12L));
|
||||||
|
|
||||||
|
indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_2").setVersion(12).setVersionType(VersionType.FORCE).get();
|
||||||
|
assertThat(indexResponse.getVersion(), equalTo(12L));
|
||||||
|
|
||||||
|
indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_2").setVersion(14).setVersionType(VersionType.FORCE).get();
|
||||||
|
assertThat(indexResponse.getVersion(), equalTo(14L));
|
||||||
|
|
||||||
|
indexResponse = client().prepareIndex("test", "type", "1").setSource("field1", "value1_1").setVersion(13).setVersionType(VersionType.FORCE).get();
|
||||||
|
assertThat(indexResponse.getVersion(), equalTo(13L));
|
||||||
|
|
||||||
|
client().admin().indices().prepareRefresh().execute().actionGet();
|
||||||
|
if (randomBoolean()) {
|
||||||
|
refresh();
|
||||||
|
}
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
assertThat(client().prepareGet("test", "type", "1").get().getVersion(), equalTo(13L));
|
||||||
|
}
|
||||||
|
|
||||||
|
// deleting with a lower version works.
|
||||||
|
long v = randomIntBetween(12, 14);
|
||||||
|
DeleteResponse deleteResponse = client().prepareDelete("test", "type", "1").setVersion(v).setVersionType(VersionType.FORCE).get();
|
||||||
|
assertEquals(DocWriteResponse.Result.DELETED, deleteResponse.getResult());
|
||||||
|
assertThat(deleteResponse.getVersion(), equalTo(v));
|
||||||
|
}
|
||||||
|
|
||||||
public void testExternalGTE() throws Exception {
|
public void testExternalGTE() throws Exception {
|
||||||
createIndex("test");
|
createIndex("test");
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,44 @@
|
||||||
|
---
|
||||||
|
"Force version":
|
||||||
|
|
||||||
|
- do:
|
||||||
|
index:
|
||||||
|
index: test_1
|
||||||
|
type: test
|
||||||
|
id: 1
|
||||||
|
body: { foo: bar }
|
||||||
|
version_type: force
|
||||||
|
version: 5
|
||||||
|
|
||||||
|
- match: { _version: 5}
|
||||||
|
|
||||||
|
- do:
|
||||||
|
delete:
|
||||||
|
index: test_1
|
||||||
|
type: test
|
||||||
|
id: 1
|
||||||
|
version_type: force
|
||||||
|
version: 4
|
||||||
|
|
||||||
|
- match: { _version: 4}
|
||||||
|
|
||||||
|
- do:
|
||||||
|
index:
|
||||||
|
index: test_1
|
||||||
|
type: test
|
||||||
|
id: 1
|
||||||
|
body: { foo: bar }
|
||||||
|
version_type: force
|
||||||
|
version: 6
|
||||||
|
|
||||||
|
- match: { _version: 6}
|
||||||
|
|
||||||
|
- do:
|
||||||
|
delete:
|
||||||
|
index: test_1
|
||||||
|
type: test
|
||||||
|
id: 1
|
||||||
|
version_type: force
|
||||||
|
version: 6
|
||||||
|
|
||||||
|
- match: { _version: 6}
|
|
@ -86,3 +86,30 @@
|
||||||
id: 1
|
id: 1
|
||||||
version: 1
|
version: 1
|
||||||
version_type: external_gte
|
version_type: external_gte
|
||||||
|
|
||||||
|
- do:
|
||||||
|
get:
|
||||||
|
index: test_1
|
||||||
|
type: test
|
||||||
|
id: 1
|
||||||
|
version: 2
|
||||||
|
version_type: force
|
||||||
|
- match: { _id: "1" }
|
||||||
|
|
||||||
|
- do:
|
||||||
|
get:
|
||||||
|
index: test_1
|
||||||
|
type: test
|
||||||
|
id: 1
|
||||||
|
version: 10
|
||||||
|
version_type: force
|
||||||
|
- match: { _id: "1" }
|
||||||
|
|
||||||
|
- do:
|
||||||
|
get:
|
||||||
|
index: test_1
|
||||||
|
type: test
|
||||||
|
id: 1
|
||||||
|
version: 1
|
||||||
|
version_type: force
|
||||||
|
- match: { _id: "1" }
|
||||||
|
|
|
@ -0,0 +1,46 @@
|
||||||
|
---
|
||||||
|
"Force version":
|
||||||
|
|
||||||
|
- do:
|
||||||
|
index:
|
||||||
|
index: test_1
|
||||||
|
type: test
|
||||||
|
id: 1
|
||||||
|
body: { foo: bar }
|
||||||
|
version_type: force
|
||||||
|
version: 5
|
||||||
|
|
||||||
|
- match: { _version: 5}
|
||||||
|
|
||||||
|
- do:
|
||||||
|
index:
|
||||||
|
index: test_1
|
||||||
|
type: test
|
||||||
|
id: 1
|
||||||
|
body: { foo: bar }
|
||||||
|
version_type: force
|
||||||
|
version: 4
|
||||||
|
|
||||||
|
- match: { _version: 4}
|
||||||
|
|
||||||
|
- do:
|
||||||
|
index:
|
||||||
|
index: test_1
|
||||||
|
type: test
|
||||||
|
id: 1
|
||||||
|
body: { foo: bar2 }
|
||||||
|
version_type: force
|
||||||
|
version: 5
|
||||||
|
|
||||||
|
- match: { _version: 5}
|
||||||
|
|
||||||
|
- do:
|
||||||
|
index:
|
||||||
|
index: test_1
|
||||||
|
type: test
|
||||||
|
id: 1
|
||||||
|
body: { foo: bar3 }
|
||||||
|
version_type: force
|
||||||
|
version: 5
|
||||||
|
|
||||||
|
- match: { _version: 5}
|
|
@ -86,3 +86,30 @@
|
||||||
id: 1
|
id: 1
|
||||||
version: 1
|
version: 1
|
||||||
version_type: external_gte
|
version_type: external_gte
|
||||||
|
|
||||||
|
- do:
|
||||||
|
get:
|
||||||
|
index: test_1
|
||||||
|
type: test
|
||||||
|
id: 1
|
||||||
|
version: 2
|
||||||
|
version_type: force
|
||||||
|
- match: { _id: "1" }
|
||||||
|
|
||||||
|
- do:
|
||||||
|
get:
|
||||||
|
index: test_1
|
||||||
|
type: test
|
||||||
|
id: 1
|
||||||
|
version: 10
|
||||||
|
version_type: force
|
||||||
|
- match: { _id: "1" }
|
||||||
|
|
||||||
|
- do:
|
||||||
|
get:
|
||||||
|
index: test_1
|
||||||
|
type: test
|
||||||
|
id: 1
|
||||||
|
version: 1
|
||||||
|
version_type: force
|
||||||
|
- match: { _id: "1" }
|
||||||
|
|
Loading…
Reference in New Issue