Distinguish environment failures from transient operation failures for write operations
Currently, we treat all write operation exceptions as equals, but in reality every write operation can cause either an environment failure (i.e. a failure that should fail the engine e.g. data corruption, lucene tragic events) or operation failure (i.e. a failure that is transient w.r.t the operation e.g. parsing exception). This change bubbles up enironment failures from the engine, after failing the engine but captures transient operation failures as part of the operation to be processed appopriately at the transport level.
This commit is contained in:
parent
1b1f484c28
commit
415fdee828
|
@ -19,7 +19,6 @@
|
|||
|
||||
package org.elasticsearch;
|
||||
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.elasticsearch.action.support.replication.ReplicationOperation;
|
||||
import org.elasticsearch.cluster.action.shard.ShardStateAction;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
|
@ -29,6 +28,7 @@ import org.elasticsearch.common.logging.LoggerMessageFormat;
|
|||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.engine.OperationFailedEngineException;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.transport.TcpTransport;
|
||||
|
@ -487,8 +487,7 @@ public class ElasticsearchException extends RuntimeException implements ToXConte
|
|||
org.elasticsearch.index.shard.TranslogRecoveryPerformer.BatchOperationException::new, 26),
|
||||
SNAPSHOT_CREATION_EXCEPTION(org.elasticsearch.snapshots.SnapshotCreationException.class,
|
||||
org.elasticsearch.snapshots.SnapshotCreationException::new, 27),
|
||||
DELETE_FAILED_ENGINE_EXCEPTION(org.elasticsearch.index.engine.DeleteFailedEngineException.class,
|
||||
org.elasticsearch.index.engine.DeleteFailedEngineException::new, 28),
|
||||
// 28 was DeleteFailedEngineException
|
||||
DOCUMENT_MISSING_EXCEPTION(org.elasticsearch.index.engine.DocumentMissingException.class,
|
||||
org.elasticsearch.index.engine.DocumentMissingException::new, 29),
|
||||
SNAPSHOT_EXCEPTION(org.elasticsearch.snapshots.SnapshotException.class,
|
||||
|
@ -581,8 +580,8 @@ public class ElasticsearchException extends RuntimeException implements ToXConte
|
|||
org.elasticsearch.action.TimestampParsingException::new, 78),
|
||||
ROUTING_MISSING_EXCEPTION(org.elasticsearch.action.RoutingMissingException.class,
|
||||
org.elasticsearch.action.RoutingMissingException::new, 79),
|
||||
INDEX_FAILED_ENGINE_EXCEPTION(org.elasticsearch.index.engine.IndexFailedEngineException.class,
|
||||
org.elasticsearch.index.engine.IndexFailedEngineException::new, 80),
|
||||
OPERATION_FAILED_ENGINE_EXCEPTION(OperationFailedEngineException.class,
|
||||
OperationFailedEngineException::new, 80),
|
||||
INDEX_SHARD_RESTORE_FAILED_EXCEPTION(org.elasticsearch.index.snapshots.IndexShardRestoreFailedException.class,
|
||||
org.elasticsearch.index.snapshots.IndexShardRestoreFailedException::new, 81),
|
||||
REPOSITORY_EXCEPTION(org.elasticsearch.repositories.RepositoryException.class,
|
||||
|
|
|
@ -1,36 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index.engine;
|
||||
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class DeleteFailedEngineException extends EngineException {
|
||||
|
||||
public DeleteFailedEngineException(ShardId shardId, Engine.Delete delete, Throwable cause) {
|
||||
super(shardId, "Delete failed for [" + delete.uid().text() + "]", cause);
|
||||
}
|
||||
|
||||
public DeleteFailedEngineException(StreamInput in) throws IOException{
|
||||
super(in);
|
||||
}
|
||||
}
|
|
@ -77,6 +77,7 @@ import java.util.Base64;
|
|||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -277,9 +278,9 @@ public abstract class Engine implements Closeable {
|
|||
}
|
||||
}
|
||||
|
||||
public abstract void index(Index operation) throws EngineException;
|
||||
public abstract void index(Index operation) throws OperationFailedEngineException;
|
||||
|
||||
public abstract void delete(Delete delete) throws EngineException;
|
||||
public abstract void delete(Delete delete) throws OperationFailedEngineException;
|
||||
|
||||
/**
|
||||
* Attempts to do a special commit where the given syncID is put into the commit data. The attempt
|
||||
|
@ -767,11 +768,28 @@ public abstract class Engine implements Closeable {
|
|||
}
|
||||
|
||||
public abstract static class Operation {
|
||||
|
||||
/** type of operation (index, delete), subclasses use static types */
|
||||
public enum TYPE {
|
||||
INDEX, DELETE;
|
||||
|
||||
private final String lowercase;
|
||||
|
||||
TYPE() {
|
||||
this.lowercase = this.toString().toLowerCase(Locale.ROOT);
|
||||
}
|
||||
|
||||
public String getLowercase() {
|
||||
return lowercase;
|
||||
}
|
||||
}
|
||||
|
||||
private final Term uid;
|
||||
private long version;
|
||||
private final VersionType versionType;
|
||||
private final Origin origin;
|
||||
private Translog.Location location;
|
||||
private Exception failure;
|
||||
private final long startTime;
|
||||
private long endTime;
|
||||
|
||||
|
@ -818,6 +836,18 @@ public abstract class Engine implements Closeable {
|
|||
return this.location;
|
||||
}
|
||||
|
||||
public Exception getFailure() {
|
||||
return failure;
|
||||
}
|
||||
|
||||
public void setFailure(Exception failure) {
|
||||
this.failure = failure;
|
||||
}
|
||||
|
||||
public boolean hasFailure() {
|
||||
return failure != null;
|
||||
}
|
||||
|
||||
public int sizeInBytes() {
|
||||
if (location != null) {
|
||||
return location.size;
|
||||
|
@ -853,6 +883,8 @@ public abstract class Engine implements Closeable {
|
|||
abstract String type();
|
||||
|
||||
abstract String id();
|
||||
|
||||
abstract TYPE operationType();
|
||||
}
|
||||
|
||||
public static class Index extends Operation {
|
||||
|
@ -892,6 +924,11 @@ public abstract class Engine implements Closeable {
|
|||
return this.doc.id();
|
||||
}
|
||||
|
||||
@Override
|
||||
TYPE operationType() {
|
||||
return TYPE.INDEX;
|
||||
}
|
||||
|
||||
public String routing() {
|
||||
return this.doc.routing();
|
||||
}
|
||||
|
@ -985,6 +1022,11 @@ public abstract class Engine implements Closeable {
|
|||
return this.id;
|
||||
}
|
||||
|
||||
@Override
|
||||
TYPE operationType() {
|
||||
return TYPE.DELETE;
|
||||
}
|
||||
|
||||
public void updateVersion(long version, boolean found) {
|
||||
updateVersion(version);
|
||||
this.found = found;
|
||||
|
|
|
@ -397,7 +397,7 @@ public class InternalEngine extends Engine {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void index(Index index) {
|
||||
public void index(Index index) throws OperationFailedEngineException {
|
||||
try (ReleasableLock lock = readLock.acquire()) {
|
||||
ensureOpen();
|
||||
if (index.origin().isRecovery()) {
|
||||
|
@ -408,13 +408,31 @@ public class InternalEngine extends Engine {
|
|||
innerIndex(index);
|
||||
}
|
||||
}
|
||||
} catch (IllegalStateException | IOException e) {
|
||||
try {
|
||||
maybeFailEngine("index", e);
|
||||
} catch (Exception inner) {
|
||||
e.addSuppressed(inner);
|
||||
} catch (Exception e) {
|
||||
handleOperationFailure(index, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* When indexing a document into Lucene, Lucene distinguishes between environment related errors
|
||||
* (like out of disk space) and document specific errors (like analysis chain problems) by setting
|
||||
* the IndexWriter.getTragicEvent() value for the former. maybeFailEngine checks for these kind of
|
||||
* errors and returns true if that is the case. We use that to indicate a document level failure
|
||||
* and set the error in operation.setFailure. In case of environment related errors, the failure
|
||||
* is bubbled up
|
||||
*/
|
||||
private void handleOperationFailure(final Operation operation, final Exception e) throws OperationFailedEngineException {
|
||||
try {
|
||||
if (maybeFailEngine(operation.operationType().getLowercase(), e)) {
|
||||
throw new OperationFailedEngineException(shardId,
|
||||
operation.operationType().getLowercase(), operation.type(), operation.id(), e);
|
||||
} else {
|
||||
operation.setFailure(e);
|
||||
}
|
||||
throw new IndexFailedEngineException(shardId, index.type(), index.id(), e);
|
||||
} catch (Exception inner) {
|
||||
e.addSuppressed(inner);
|
||||
throw new OperationFailedEngineException(shardId,
|
||||
operation.operationType().getLowercase(), operation.type(), operation.id(), e);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -545,18 +563,13 @@ public class InternalEngine extends Engine {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void delete(Delete delete) throws EngineException {
|
||||
public void delete(Delete delete) throws OperationFailedEngineException {
|
||||
try (ReleasableLock lock = readLock.acquire()) {
|
||||
ensureOpen();
|
||||
// NOTE: we don't throttle this when merges fall behind because delete-by-id does not create new segments:
|
||||
innerDelete(delete);
|
||||
} catch (IllegalStateException | IOException e) {
|
||||
try {
|
||||
maybeFailEngine("delete", e);
|
||||
} catch (Exception inner) {
|
||||
e.addSuppressed(inner);
|
||||
}
|
||||
throw new DeleteFailedEngineException(shardId, delete, e);
|
||||
} catch (Exception e) {
|
||||
handleOperationFailure(delete, e);
|
||||
}
|
||||
|
||||
maybePruneDeletedTombstones();
|
||||
|
|
|
@ -26,21 +26,21 @@ import org.elasticsearch.index.shard.ShardId;
|
|||
import java.io.IOException;
|
||||
import java.util.Objects;
|
||||
|
||||
public class IndexFailedEngineException extends EngineException {
|
||||
public class OperationFailedEngineException extends EngineException {
|
||||
|
||||
private final String type;
|
||||
|
||||
private final String id;
|
||||
|
||||
public IndexFailedEngineException(ShardId shardId, String type, String id, Throwable cause) {
|
||||
super(shardId, "Index failed for [" + type + "#" + id + "]", cause);
|
||||
public OperationFailedEngineException(ShardId shardId, String operationType, String type, String id, Throwable cause) {
|
||||
super(shardId, operationType + " failed for [" + type + "#" + id + "]", cause);
|
||||
Objects.requireNonNull(type, "type must not be null");
|
||||
Objects.requireNonNull(id, "id must not be null");
|
||||
this.type = type;
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
public IndexFailedEngineException(StreamInput in) throws IOException{
|
||||
public OperationFailedEngineException(StreamInput in) throws IOException{
|
||||
super(in);
|
||||
type = in.readString();
|
||||
id = in.readString();
|
|
@ -106,12 +106,12 @@ public class ShadowEngine extends Engine {
|
|||
|
||||
|
||||
@Override
|
||||
public void index(Index index) throws EngineException {
|
||||
public void index(Index index) throws OperationFailedEngineException {
|
||||
throw new UnsupportedOperationException(shardId + " index operation not allowed on shadow engine");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void delete(Delete delete) throws EngineException {
|
||||
public void delete(Delete delete) throws OperationFailedEngineException {
|
||||
throw new UnsupportedOperationException(shardId + " delete operation not allowed on shadow engine");
|
||||
}
|
||||
|
||||
|
|
|
@ -555,7 +555,11 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
indexingOperationListeners.postIndex(index, e);
|
||||
throw e;
|
||||
}
|
||||
indexingOperationListeners.postIndex(index, index.isCreated());
|
||||
if (index.hasFailure()) {
|
||||
indexingOperationListeners.postIndex(index, index.getFailure());
|
||||
} else {
|
||||
indexingOperationListeners.postIndex(index, index.isCreated());
|
||||
}
|
||||
}
|
||||
|
||||
public Engine.Delete prepareDeleteOnPrimary(String type, String id, long version, VersionType versionType) {
|
||||
|
@ -599,8 +603,11 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
indexingOperationListeners.postDelete(delete, e);
|
||||
throw e;
|
||||
}
|
||||
|
||||
indexingOperationListeners.postDelete(delete);
|
||||
if (delete.hasFailure()) {
|
||||
indexingOperationListeners.postDelete(delete, delete.getFailure());
|
||||
} else {
|
||||
indexingOperationListeners.postDelete(delete);
|
||||
}
|
||||
}
|
||||
|
||||
public Engine.GetResult get(Engine.Get get) {
|
||||
|
|
|
@ -51,7 +51,7 @@ import org.elasticsearch.common.xcontent.XContentLocation;
|
|||
import org.elasticsearch.discovery.DiscoverySettings;
|
||||
import org.elasticsearch.index.AlreadyExpiredException;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.engine.IndexFailedEngineException;
|
||||
import org.elasticsearch.index.engine.OperationFailedEngineException;
|
||||
import org.elasticsearch.index.engine.RecoveryEngineException;
|
||||
import org.elasticsearch.index.query.QueryShardException;
|
||||
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
|
||||
|
@ -402,13 +402,13 @@ public class ExceptionSerializationTests extends ESTestCase {
|
|||
|
||||
public void testIndexFailedEngineException() throws IOException {
|
||||
ShardId id = new ShardId("foo", "_na_", 1);
|
||||
IndexFailedEngineException ex = serialize(new IndexFailedEngineException(id, "type", "id", null));
|
||||
OperationFailedEngineException ex = serialize(new OperationFailedEngineException(id, "index", "type", "id", null));
|
||||
assertEquals(ex.getShardId(), new ShardId("foo", "_na_", 1));
|
||||
assertEquals("type", ex.type());
|
||||
assertEquals("id", ex.id());
|
||||
assertNull(ex.getCause());
|
||||
|
||||
ex = serialize(new IndexFailedEngineException(null, "type", "id", new NullPointerException()));
|
||||
ex = serialize(new OperationFailedEngineException(null, "index", "type", "id", new NullPointerException()));
|
||||
assertNull(ex.getShardId());
|
||||
assertEquals("type", ex.type());
|
||||
assertEquals("id", ex.id());
|
||||
|
@ -680,7 +680,7 @@ public class ExceptionSerializationTests extends ESTestCase {
|
|||
ids.put(25, org.elasticsearch.script.GeneralScriptException.class);
|
||||
ids.put(26, org.elasticsearch.index.shard.TranslogRecoveryPerformer.BatchOperationException.class);
|
||||
ids.put(27, org.elasticsearch.snapshots.SnapshotCreationException.class);
|
||||
ids.put(28, org.elasticsearch.index.engine.DeleteFailedEngineException.class);
|
||||
ids.put(28, null); // was DeleteFailedEngineException
|
||||
ids.put(29, org.elasticsearch.index.engine.DocumentMissingException.class);
|
||||
ids.put(30, org.elasticsearch.snapshots.SnapshotException.class);
|
||||
ids.put(31, org.elasticsearch.indices.InvalidAliasNameException.class);
|
||||
|
@ -732,7 +732,7 @@ public class ExceptionSerializationTests extends ESTestCase {
|
|||
ids.put(77, org.elasticsearch.common.util.concurrent.UncategorizedExecutionException.class);
|
||||
ids.put(78, org.elasticsearch.action.TimestampParsingException.class);
|
||||
ids.put(79, org.elasticsearch.action.RoutingMissingException.class);
|
||||
ids.put(80, org.elasticsearch.index.engine.IndexFailedEngineException.class);
|
||||
ids.put(80, OperationFailedEngineException.class);
|
||||
ids.put(81, org.elasticsearch.index.snapshots.IndexShardRestoreFailedException.class);
|
||||
ids.put(82, org.elasticsearch.repositories.RepositoryException.class);
|
||||
ids.put(83, org.elasticsearch.transport.ReceiveTimeoutTransportException.class);
|
||||
|
|
|
@ -68,11 +68,9 @@ import org.elasticsearch.common.lucene.uid.Versions;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.VersionType;
|
||||
import org.elasticsearch.index.analysis.AnalysisRegistry;
|
||||
import org.elasticsearch.index.analysis.AnalyzerScope;
|
||||
import org.elasticsearch.index.analysis.IndexAnalyzers;
|
||||
import org.elasticsearch.index.analysis.NamedAnalyzer;
|
||||
|
@ -135,6 +133,7 @@ import java.util.concurrent.atomic.AtomicReference;
|
|||
import static java.util.Collections.emptyMap;
|
||||
import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY;
|
||||
import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA;
|
||||
import static org.hamcrest.CoreMatchers.instanceOf;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.everyItem;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
|
@ -1080,21 +1079,15 @@ public class InternalEngineTests extends ESTestCase {
|
|||
assertThat(index.version(), equalTo(2L));
|
||||
|
||||
index = new Engine.Index(newUid("1"), doc, 1L, VersionType.INTERNAL, Engine.Operation.Origin.PRIMARY, 0, -1, false);
|
||||
try {
|
||||
engine.index(index);
|
||||
fail();
|
||||
} catch (VersionConflictEngineException e) {
|
||||
// all is well
|
||||
}
|
||||
engine.index(index);
|
||||
assertTrue(index.hasFailure());
|
||||
assertThat(index.getFailure(), instanceOf(VersionConflictEngineException.class));
|
||||
|
||||
// future versions should not work as well
|
||||
index = new Engine.Index(newUid("1"), doc, 3L, VersionType.INTERNAL, PRIMARY, 0, -1, false);
|
||||
try {
|
||||
engine.index(index);
|
||||
fail();
|
||||
} catch (VersionConflictEngineException e) {
|
||||
// all is well
|
||||
}
|
||||
engine.index(index);
|
||||
assertTrue(index.hasFailure());
|
||||
assertThat(index.getFailure(), instanceOf(VersionConflictEngineException.class));
|
||||
}
|
||||
|
||||
public void testExternalVersioningIndexConflict() {
|
||||
|
@ -1108,12 +1101,9 @@ public class InternalEngineTests extends ESTestCase {
|
|||
assertThat(index.version(), equalTo(14L));
|
||||
|
||||
index = new Engine.Index(newUid("1"), doc, 13, VersionType.EXTERNAL, PRIMARY, 0, -1, false);
|
||||
try {
|
||||
engine.index(index);
|
||||
fail();
|
||||
} catch (VersionConflictEngineException e) {
|
||||
// all is well
|
||||
}
|
||||
engine.index(index);
|
||||
assertTrue(index.hasFailure());
|
||||
assertThat(index.getFailure(), instanceOf(VersionConflictEngineException.class));
|
||||
}
|
||||
|
||||
public void testVersioningIndexConflictWithFlush() {
|
||||
|
@ -1129,21 +1119,15 @@ public class InternalEngineTests extends ESTestCase {
|
|||
engine.flush();
|
||||
|
||||
index = new Engine.Index(newUid("1"), doc, 1L, VersionType.INTERNAL, PRIMARY, 0, -1, false);
|
||||
try {
|
||||
engine.index(index);
|
||||
fail();
|
||||
} catch (VersionConflictEngineException e) {
|
||||
// all is well
|
||||
}
|
||||
engine.index(index);
|
||||
assertTrue(index.hasFailure());
|
||||
assertThat(index.getFailure(), instanceOf(VersionConflictEngineException.class));
|
||||
|
||||
// future versions should not work as well
|
||||
index = new Engine.Index(newUid("1"), doc, 3L, VersionType.INTERNAL, PRIMARY, 0, -1, false);
|
||||
try {
|
||||
engine.index(index);
|
||||
fail();
|
||||
} catch (VersionConflictEngineException e) {
|
||||
// all is well
|
||||
}
|
||||
engine.index(index);
|
||||
assertTrue(index.hasFailure());
|
||||
assertThat(index.getFailure(), instanceOf(VersionConflictEngineException.class));
|
||||
}
|
||||
|
||||
public void testExternalVersioningIndexConflictWithFlush() {
|
||||
|
@ -1159,12 +1143,9 @@ public class InternalEngineTests extends ESTestCase {
|
|||
engine.flush();
|
||||
|
||||
index = new Engine.Index(newUid("1"), doc, 13, VersionType.EXTERNAL, PRIMARY, 0, -1, false);
|
||||
try {
|
||||
engine.index(index);
|
||||
fail();
|
||||
} catch (VersionConflictEngineException e) {
|
||||
// all is well
|
||||
}
|
||||
engine.index(index);
|
||||
assertTrue(index.hasFailure());
|
||||
assertThat(index.getFailure(), instanceOf(VersionConflictEngineException.class));
|
||||
}
|
||||
|
||||
public void testForceMerge() throws IOException {
|
||||
|
@ -1273,21 +1254,15 @@ public class InternalEngineTests extends ESTestCase {
|
|||
assertThat(index.version(), equalTo(2L));
|
||||
|
||||
Engine.Delete delete = new Engine.Delete("test", "1", newUid("1"), 1L, VersionType.INTERNAL, PRIMARY, 0, false);
|
||||
try {
|
||||
engine.delete(delete);
|
||||
fail();
|
||||
} catch (VersionConflictEngineException e) {
|
||||
// all is well
|
||||
}
|
||||
engine.delete(delete);
|
||||
assertTrue(delete.hasFailure());
|
||||
assertThat(delete.getFailure(), instanceOf(VersionConflictEngineException.class));
|
||||
|
||||
// future versions should not work as well
|
||||
delete = new Engine.Delete("test", "1", newUid("1"), 3L, VersionType.INTERNAL, PRIMARY, 0, false);
|
||||
try {
|
||||
engine.delete(delete);
|
||||
fail();
|
||||
} catch (VersionConflictEngineException e) {
|
||||
// all is well
|
||||
}
|
||||
engine.delete(delete);
|
||||
assertTrue(delete.hasFailure());
|
||||
assertThat(delete.getFailure(), instanceOf(VersionConflictEngineException.class));
|
||||
|
||||
// now actually delete
|
||||
delete = new Engine.Delete("test", "1", newUid("1"), 2L, VersionType.INTERNAL, PRIMARY, 0, false);
|
||||
|
@ -1296,20 +1271,9 @@ public class InternalEngineTests extends ESTestCase {
|
|||
|
||||
// now check if we can index to a delete doc with version
|
||||
index = new Engine.Index(newUid("1"), doc, 2L, VersionType.INTERNAL, PRIMARY, 0, -1, false);
|
||||
try {
|
||||
engine.index(index);
|
||||
fail();
|
||||
} catch (VersionConflictEngineException e) {
|
||||
// all is well
|
||||
}
|
||||
|
||||
// we shouldn't be able to create as well
|
||||
Engine.Index create = new Engine.Index(newUid("1"), doc, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, 0, -1, false);
|
||||
try {
|
||||
engine.index(create);
|
||||
} catch (VersionConflictEngineException e) {
|
||||
// all is well
|
||||
}
|
||||
engine.index(index);
|
||||
assertTrue(index.hasFailure());
|
||||
assertThat(index.getFailure(), instanceOf(VersionConflictEngineException.class));
|
||||
}
|
||||
|
||||
public void testVersioningDeleteConflictWithFlush() {
|
||||
|
@ -1325,21 +1289,15 @@ public class InternalEngineTests extends ESTestCase {
|
|||
engine.flush();
|
||||
|
||||
Engine.Delete delete = new Engine.Delete("test", "1", newUid("1"), 1L, VersionType.INTERNAL, PRIMARY, 0, false);
|
||||
try {
|
||||
engine.delete(delete);
|
||||
fail();
|
||||
} catch (VersionConflictEngineException e) {
|
||||
// all is well
|
||||
}
|
||||
engine.delete(delete);
|
||||
assertTrue(delete.hasFailure());
|
||||
assertThat(delete.getFailure(), instanceOf(VersionConflictEngineException.class));
|
||||
|
||||
// future versions should not work as well
|
||||
delete = new Engine.Delete("test", "1", newUid("1"), 3L, VersionType.INTERNAL, PRIMARY, 0, false);
|
||||
try {
|
||||
engine.delete(delete);
|
||||
fail();
|
||||
} catch (VersionConflictEngineException e) {
|
||||
// all is well
|
||||
}
|
||||
engine.delete(delete);
|
||||
assertTrue(delete.hasFailure());
|
||||
assertThat(delete.getFailure(), instanceOf(VersionConflictEngineException.class));
|
||||
|
||||
engine.flush();
|
||||
|
||||
|
@ -1352,20 +1310,9 @@ public class InternalEngineTests extends ESTestCase {
|
|||
|
||||
// now check if we can index to a delete doc with version
|
||||
index = new Engine.Index(newUid("1"), doc, 2L, VersionType.INTERNAL, PRIMARY, 0, -1, false);
|
||||
try {
|
||||
engine.index(index);
|
||||
fail();
|
||||
} catch (VersionConflictEngineException e) {
|
||||
// all is well
|
||||
}
|
||||
|
||||
// we shouldn't be able to create as well
|
||||
Engine.Index create = new Engine.Index(newUid("1"), doc, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, 0, -1, false);
|
||||
try {
|
||||
engine.index(create);
|
||||
} catch (VersionConflictEngineException e) {
|
||||
// all is well
|
||||
}
|
||||
engine.index(index);
|
||||
assertTrue(index.hasFailure());
|
||||
assertThat(index.getFailure(), instanceOf(VersionConflictEngineException.class));
|
||||
}
|
||||
|
||||
public void testVersioningCreateExistsException() {
|
||||
|
@ -1375,12 +1322,9 @@ public class InternalEngineTests extends ESTestCase {
|
|||
assertThat(create.version(), equalTo(1L));
|
||||
|
||||
create = new Engine.Index(newUid("1"), doc, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, 0, -1, false);
|
||||
try {
|
||||
engine.index(create);
|
||||
fail();
|
||||
} catch (VersionConflictEngineException e) {
|
||||
// all is well
|
||||
}
|
||||
engine.index(create);
|
||||
assertTrue(create.hasFailure());
|
||||
assertThat(create.getFailure(), instanceOf(VersionConflictEngineException.class));
|
||||
}
|
||||
|
||||
public void testVersioningCreateExistsExceptionWithFlush() {
|
||||
|
@ -1392,12 +1336,9 @@ public class InternalEngineTests extends ESTestCase {
|
|||
engine.flush();
|
||||
|
||||
create = new Engine.Index(newUid("1"), doc, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, 0, -1, false);
|
||||
try {
|
||||
engine.index(create);
|
||||
fail();
|
||||
} catch (VersionConflictEngineException e) {
|
||||
// all is well
|
||||
}
|
||||
engine.index(create);
|
||||
assertTrue(create.hasFailure());
|
||||
assertThat(create.getFailure(), instanceOf(VersionConflictEngineException.class));
|
||||
}
|
||||
|
||||
public void testVersioningReplicaConflict1() {
|
||||
|
@ -1417,22 +1358,16 @@ public class InternalEngineTests extends ESTestCase {
|
|||
|
||||
// now, the old one should not work
|
||||
index = new Engine.Index(newUid("1"), doc, 1L, VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false);
|
||||
try {
|
||||
replicaEngine.index(index);
|
||||
fail();
|
||||
} catch (VersionConflictEngineException e) {
|
||||
// all is well
|
||||
}
|
||||
replicaEngine.index(index);
|
||||
assertTrue(index.hasFailure());
|
||||
assertThat(index.getFailure(), instanceOf(VersionConflictEngineException.class));
|
||||
|
||||
// second version on replica should fail as well
|
||||
try {
|
||||
index = new Engine.Index(newUid("1"), doc, 2L
|
||||
, VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false);
|
||||
replicaEngine.index(index);
|
||||
assertThat(index.version(), equalTo(2L));
|
||||
} catch (VersionConflictEngineException e) {
|
||||
// all is well
|
||||
}
|
||||
index = new Engine.Index(newUid("1"), doc, 2L
|
||||
, VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false);
|
||||
replicaEngine.index(index);
|
||||
assertThat(index.version(), equalTo(2L));
|
||||
assertThat(index.getFailure(), instanceOf(VersionConflictEngineException.class));
|
||||
}
|
||||
|
||||
public void testVersioningReplicaConflict2() {
|
||||
|
@ -1464,23 +1399,17 @@ public class InternalEngineTests extends ESTestCase {
|
|||
assertThat(delete.version(), equalTo(3L));
|
||||
|
||||
// second time delete with same version should fail
|
||||
try {
|
||||
delete = new Engine.Delete("test", "1", newUid("1"), 3L
|
||||
, VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0, false);
|
||||
replicaEngine.delete(delete);
|
||||
fail("excepted VersionConflictEngineException to be thrown");
|
||||
} catch (VersionConflictEngineException e) {
|
||||
// all is well
|
||||
}
|
||||
delete = new Engine.Delete("test", "1", newUid("1"), 3L
|
||||
, VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0, false);
|
||||
replicaEngine.delete(delete);
|
||||
assertTrue(delete.hasFailure());
|
||||
assertThat(delete.getFailure(), instanceOf(VersionConflictEngineException.class));
|
||||
|
||||
// now do the second index on the replica, it should fail
|
||||
try {
|
||||
index = new Engine.Index(newUid("1"), doc, 2L, VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false);
|
||||
replicaEngine.index(index);
|
||||
fail("excepted VersionConflictEngineException to be thrown");
|
||||
} catch (VersionConflictEngineException e) {
|
||||
// all is well
|
||||
}
|
||||
index = new Engine.Index(newUid("1"), doc, 2L, VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false);
|
||||
replicaEngine.index(index);
|
||||
assertTrue(index.hasFailure());
|
||||
assertThat(index.getFailure(), instanceOf(VersionConflictEngineException.class));
|
||||
}
|
||||
|
||||
public void testBasicCreatedFlag() {
|
||||
|
@ -1636,24 +1565,20 @@ public class InternalEngineTests extends ESTestCase {
|
|||
assertThat(getResult.exists(), equalTo(false));
|
||||
|
||||
// Try to index uid=1 with a too-old version, should fail:
|
||||
try {
|
||||
engine.index(new Engine.Index(newUid("1"), doc, 2, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), -1, false));
|
||||
fail("did not hit expected exception");
|
||||
} catch (VersionConflictEngineException vcee) {
|
||||
// expected
|
||||
}
|
||||
Engine.Index index = new Engine.Index(newUid("1"), doc, 2, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), -1, false);
|
||||
engine.index(index);
|
||||
assertTrue(index.hasFailure());
|
||||
assertThat(index.getFailure(), instanceOf(VersionConflictEngineException.class));
|
||||
|
||||
// Get should still not find the document
|
||||
getResult = engine.get(new Engine.Get(true, newUid("1")));
|
||||
assertThat(getResult.exists(), equalTo(false));
|
||||
|
||||
// Try to index uid=2 with a too-old version, should fail:
|
||||
try {
|
||||
engine.index(new Engine.Index(newUid("2"), doc, 2, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), -1, false));
|
||||
fail("did not hit expected exception");
|
||||
} catch (VersionConflictEngineException vcee) {
|
||||
// expected
|
||||
}
|
||||
Engine.Index index1 = new Engine.Index(newUid("2"), doc, 2, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), -1, false);
|
||||
engine.index(index1);
|
||||
assertTrue(index1.hasFailure());
|
||||
assertThat(index1.getFailure(), instanceOf(VersionConflictEngineException.class));
|
||||
|
||||
// Get should not find the document
|
||||
getResult = engine.get(new Engine.Get(true, newUid("2")));
|
||||
|
|
|
@ -359,7 +359,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
try {
|
||||
indexShard.acquirePrimaryOperationLock(null, ThreadPool.Names.INDEX);
|
||||
fail("shard shouldn't accept primary ops");
|
||||
} catch (IllegalStateException ignored) {
|
||||
} catch (ShardNotFoundException ignored) {
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -489,8 +489,8 @@ public class IndexStatsIT extends ESIntegTestCase {
|
|||
} catch (VersionConflictEngineException e) {}
|
||||
|
||||
stats = client().admin().indices().prepareStats().setTypes("type1", "type2").execute().actionGet();
|
||||
assertThat(stats.getIndex("test1").getTotal().getIndexing().getTotal().getIndexFailedCount(), equalTo(2L));
|
||||
assertThat(stats.getIndex("test2").getTotal().getIndexing().getTotal().getIndexFailedCount(), equalTo(1L));
|
||||
assertThat(stats.getIndex("test1").getPrimaries().getIndexing().getTotal().getIndexFailedCount(), equalTo(2L));
|
||||
assertThat(stats.getIndex("test2").getPrimaries().getIndexing().getTotal().getIndexFailedCount(), equalTo(1L));
|
||||
assertThat(stats.getPrimaries().getIndexing().getTypeStats().get("type1").getIndexFailedCount(), equalTo(1L));
|
||||
assertThat(stats.getPrimaries().getIndexing().getTypeStats().get("type2").getIndexFailedCount(), equalTo(1L));
|
||||
assertThat(stats.getTotal().getIndexing().getTotal().getIndexFailedCount(), equalTo(3L));
|
||||
|
|
Loading…
Reference in New Issue