Merge remote-tracking branch 'dakrone/readd-force-versioning'

This commit is contained in:
Lee Hinman 2016-10-19 11:42:08 -06:00
commit f825988589
6 changed files with 108 additions and 1 deletions

View File

@ -90,6 +90,9 @@ public class DeleteRequest extends ReplicatedWriteRequest<DeleteRequest> impleme
if (!versionType.validateVersionForWrites(version)) {
validationException = addValidationError("illegal version value [" + version + "] for version type [" + versionType.name() + "]", validationException);
}
if (versionType == VersionType.FORCE) {
validationException = addValidationError("version type [force] may no longer be used", validationException);
}
return validationException;
}

View File

@ -160,6 +160,10 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
validationException = addValidationError("illegal version value [" + version + "] for version type [" + versionType.name() + "]", validationException);
}
if (versionType == VersionType.FORCE) {
validationException = addValidationError("version type [force] may no longer be used", validationException);
}
if (ttl != null) {
if (ttl.millis() < 0) {
validationException = addValidationError("ttl must not be negative", validationException);

View File

@ -141,7 +141,12 @@ public class UpdateHelper extends AbstractComponent {
return new Result(indexRequest, DocWriteResponse.Result.CREATED, null, null);
}
final long updateVersion = getResult.getVersion();
long updateVersion = getResult.getVersion();
if (request.versionType() != VersionType.INTERNAL) {
assert request.versionType() == VersionType.FORCE;
updateVersion = request.version(); // remember, match_any is excluded by the conflict test
}
if (getResult.internalSourceRef() == null) {
// no source, we can't do nothing, through a failure...

View File

@ -195,6 +195,55 @@ public enum VersionType implements Writeable {
return version >= 0L || version == Versions.MATCH_ANY;
}
},
/**
* Warning: this version type should be used with care. Concurrent indexing may result in loss of data on replicas
*
* @deprecated this will be removed in 7.0 and should not be used! It is *ONLY* for backward compatibility with 5.0 indices
*/
@Deprecated
FORCE((byte) 3) {
@Override
public boolean isVersionConflictForWrites(long currentVersion, long expectedVersion, boolean deleted) {
if (currentVersion == Versions.NOT_FOUND) {
return false;
}
if (expectedVersion == Versions.MATCH_ANY) {
throw new IllegalStateException("you must specify a version when use VersionType.FORCE");
}
return false;
}
@Override
public String explainConflictForWrites(long currentVersion, long expectedVersion, boolean deleted) {
throw new AssertionError("VersionType.FORCE should never result in a write conflict");
}
@Override
public boolean isVersionConflictForReads(long currentVersion, long expectedVersion) {
return false;
}
@Override
public String explainConflictForReads(long currentVersion, long expectedVersion) {
throw new AssertionError("VersionType.FORCE should never result in a read conflict");
}
@Override
public long updateVersion(long currentVersion, long expectedVersion) {
return expectedVersion;
}
@Override
public boolean validateVersionForWrites(long version) {
return version >= 0L;
}
@Override
public boolean validateVersionForReads(long version) {
return version >= 0L || version == Versions.MATCH_ANY;
}
};
private final byte value;
@ -288,6 +337,8 @@ public enum VersionType implements Writeable {
return EXTERNAL;
} else if ("external_gte".equals(versionType)) {
return EXTERNAL_GTE;
} else if ("force".equals(versionType)) {
return FORCE;
}
throw new IllegalArgumentException("No version type match [" + versionType + "]");
}
@ -306,6 +357,8 @@ public enum VersionType implements Writeable {
return EXTERNAL;
} else if (value == 2) {
return EXTERNAL_GTE;
} else if (value == 3) {
return FORCE;
}
throw new IllegalArgumentException("No version type match [" + value + "]");
}

View File

@ -353,6 +353,16 @@ public class InternalEngine extends Engine {
final long currentVersion,
final long expectedVersion,
final boolean deleted) {
if (op.versionType() == VersionType.FORCE) {
if (engineConfig.getIndexSettings().getIndexVersionCreated().onOrAfter(Version.V_6_0_0_alpha1)) {
// If index was created in 5.0 or later, 'force' is not allowed at all
throw new IllegalArgumentException("version type [FORCE] may not be used for indices created after 6.0");
} else if (op.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
// For earlier indices, 'force' is only allowed for translog recovery
throw new IllegalArgumentException("version type [FORCE] may not be used for non-translog operations");
}
}
if (op.versionType().isVersionConflictForWrites(currentVersion, expectedVersion, deleted)) {
if (op.origin().isRecovery()) {
// version conflict, but okay

View File

@ -135,6 +135,7 @@ import java.util.concurrent.atomic.AtomicReference;
import static java.util.Collections.emptyMap;
import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY;
import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.everyItem;
import static org.hamcrest.Matchers.greaterThan;
@ -1116,6 +1117,37 @@ public class InternalEngineTests extends ESTestCase {
}
}
public void testForceVersioningNotAllowedExceptForOlderIndices() throws Exception {
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, null);
Engine.Index index = new Engine.Index(newUid("1"), doc, 42, VersionType.FORCE, PRIMARY, 0, -1, false);
try {
engine.index(index);
fail("should have failed due to using VersionType.FORCE");
} catch (IllegalArgumentException iae) {
assertThat(iae.getMessage(), containsString("version type [FORCE] may not be used for indices created after 6.0"));
}
IndexSettings oldIndexSettings = IndexSettingsModule.newIndexSettings("test", Settings.builder()
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.V_5_0_0_beta1)
.build());
try (Store store = createStore();
Engine engine = createEngine(oldIndexSettings, store, createTempDir(), NoMergePolicy.INSTANCE)) {
index = new Engine.Index(newUid("1"), doc, 84, VersionType.FORCE, PRIMARY, 0, -1, false);
try {
engine.index(index);
fail("should have failed due to using VersionType.FORCE");
} catch (IllegalArgumentException iae) {
assertThat(iae.getMessage(), containsString("version type [FORCE] may not be used for non-translog operations"));
}
index = new Engine.Index(newUid("1"), doc, 84, VersionType.FORCE,
Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY, 0, -1, false);
engine.index(index);
assertThat(index.version(), equalTo(84L));
}
}
public void testVersioningIndexConflictWithFlush() {
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, null);
Engine.Index index = new Engine.Index(newUid("1"), doc);