Ensure to generate identical NoOp for the same failure (#33141)

We generate slightly different NoOps in InternalEngine and
TransportShardBulkAction for the same failure.

1. InternalEngine uses Exception#getFailure to generate a message
without the class name: newOp [NoOp{seqNo=1, primaryTerm=1,
reason='Contexts are mandatory in context enabled completion field
[suggest_context]'}].

2. TransportShardBulkAction uses Exception#toString to generate a
message with the class name: NoOp{seqNo=1, primaryTerm=1,
reason='java.lang.IllegalArgumentException: Contexts are mandatory in
context enabled completion field [suggest_context]'}.

If a write operation fails while a replica is recovering, that replica
will possibly receive two different NoOps: one from recovery and one
from replication. These two different NoOps will trip
TranslogWriter#assertNoSeqNumberConflict assertion.

This commit ensures that we generate the same Noop for the same failure.

Closes #32986
This commit is contained in:
Nhat Nguyen 2018-08-27 15:59:42 -04:00 committed by GitHub
parent ed0571e16c
commit 014b3236dc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 78 additions and 75 deletions

View File

@ -802,7 +802,7 @@ public class InternalEngine extends Engine {
location = translog.add(new Translog.Index(index, indexResult));
} else if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
// if we have document failure, record it as a no-op in the translog with the generated seq_no
location = translog.add(new Translog.NoOp(indexResult.getSeqNo(), index.primaryTerm(), indexResult.getFailure().getMessage()));
location = translog.add(new Translog.NoOp(indexResult.getSeqNo(), index.primaryTerm(), indexResult.getFailure().toString()));
} else {
location = null;
}
@ -1111,7 +1111,7 @@ public class InternalEngine extends Engine {
location = translog.add(new Translog.Delete(delete, deleteResult));
} else if (deleteResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
location = translog.add(new Translog.NoOp(deleteResult.getSeqNo(),
delete.primaryTerm(), deleteResult.getFailure().getMessage()));
delete.primaryTerm(), deleteResult.getFailure().toString()));
} else {
location = null;
}

View File

@ -36,7 +36,6 @@ import org.elasticsearch.common.util.iterable.Iterables;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.index.engine.InternalEngineTests;
@ -47,6 +46,7 @@ import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTests;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.SnapshotMatchers;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.recovery.RecoveryTarget;
import org.elasticsearch.threadpool.TestThreadPool;
@ -54,6 +54,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.hamcrest.Matcher;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@ -338,38 +339,73 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase
* for primary and replica shards
*/
public void testDocumentFailureReplication() throws Exception {
final String failureMessage = "simulated document failure";
final ThrowingDocumentFailureEngineFactory throwingDocumentFailureEngineFactory =
new ThrowingDocumentFailureEngineFactory(failureMessage);
final IOException indexException = new IOException("simulated indexing failure");
final IOException deleteException = new IOException("simulated deleting failure");
final EngineFactory engineFactory = config -> InternalEngineTests.createInternalEngine((dir, iwc) ->
new IndexWriter(dir, iwc) {
final AtomicBoolean throwAfterIndexedOneDoc = new AtomicBoolean(); // need one document to trigger delete in IW.
@Override
public long addDocument(Iterable<? extends IndexableField> doc) throws IOException {
if (throwAfterIndexedOneDoc.getAndSet(true)) {
throw indexException;
} else {
return super.addDocument(doc);
}
}
@Override
public long deleteDocuments(Term... terms) throws IOException {
throw deleteException;
}
}, null, null, config);
try (ReplicationGroup shards = new ReplicationGroup(buildIndexMetaData(0)) {
@Override
protected EngineFactory getEngineFactory(ShardRouting routing) {
return throwingDocumentFailureEngineFactory;
}}) {
protected EngineFactory getEngineFactory(ShardRouting routing) { return engineFactory; }}) {
// test only primary
// start with the primary only so two first failures are replicated to replicas via recovery from the translog of the primary.
shards.startPrimary();
BulkItemResponse response = shards.index(
new IndexRequest(index.getName(), "type", "1")
.source("{}", XContentType.JSON)
);
assertTrue(response.isFailed());
assertNoOpTranslogOperationForDocumentFailure(shards, 1, shards.getPrimary().getPendingPrimaryTerm(), failureMessage);
shards.assertAllEqual(0);
long primaryTerm = shards.getPrimary().getPendingPrimaryTerm();
List<Translog.Operation> expectedTranslogOps = new ArrayList<>();
BulkItemResponse indexResp = shards.index(new IndexRequest(index.getName(), "type", "1").source("{}", XContentType.JSON));
assertThat(indexResp.isFailed(), equalTo(false));
expectedTranslogOps.add(new Translog.Index("type", "1", 0, primaryTerm, 1, "{}".getBytes(StandardCharsets.UTF_8), null, -1));
try (Translog.Snapshot snapshot = getTranslog(shards.getPrimary()).newSnapshot()) {
assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(expectedTranslogOps));
}
indexResp = shards.index(new IndexRequest(index.getName(), "type", "any").source("{}", XContentType.JSON));
assertThat(indexResp.getFailure().getCause(), equalTo(indexException));
expectedTranslogOps.add(new Translog.NoOp(1, primaryTerm, indexException.toString()));
BulkItemResponse deleteResp = shards.delete(new DeleteRequest(index.getName(), "type", "1"));
assertThat(deleteResp.getFailure().getCause(), equalTo(deleteException));
expectedTranslogOps.add(new Translog.NoOp(2, primaryTerm, deleteException.toString()));
shards.assertAllEqual(1);
// add some replicas
int nReplica = randomIntBetween(1, 3);
for (int i = 0; i < nReplica; i++) {
shards.addReplica();
}
shards.startReplicas(nReplica);
response = shards.index(
new IndexRequest(index.getName(), "type", "1")
.source("{}", XContentType.JSON)
);
assertTrue(response.isFailed());
assertNoOpTranslogOperationForDocumentFailure(shards, 2, shards.getPrimary().getPendingPrimaryTerm(), failureMessage);
shards.assertAllEqual(0);
for (IndexShard shard : shards) {
try (Translog.Snapshot snapshot = getTranslog(shard).newSnapshot()) {
assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(expectedTranslogOps));
}
}
// unlike previous failures, these two failures replicated directly from the replication channel.
indexResp = shards.index(new IndexRequest(index.getName(), "type", "any").source("{}", XContentType.JSON));
assertThat(indexResp.getFailure().getCause(), equalTo(indexException));
expectedTranslogOps.add(new Translog.NoOp(3, primaryTerm, indexException.toString()));
deleteResp = shards.delete(new DeleteRequest(index.getName(), "type", "1"));
assertThat(deleteResp.getFailure().getCause(), equalTo(deleteException));
expectedTranslogOps.add(new Translog.NoOp(4, primaryTerm, deleteException.toString()));
for (IndexShard shard : shards) {
try (Translog.Snapshot snapshot = getTranslog(shard).newSnapshot()) {
assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(expectedTranslogOps));
}
}
shards.assertAllEqual(1);
}
}
@ -541,47 +577,4 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase
shards.assertAllEqual(0);
}
}
/** Throws <code>documentFailure</code> on every indexing operation */
static class ThrowingDocumentFailureEngineFactory implements EngineFactory {
final String documentFailureMessage;
ThrowingDocumentFailureEngineFactory(String documentFailureMessage) {
this.documentFailureMessage = documentFailureMessage;
}
@Override
public Engine newReadWriteEngine(EngineConfig config) {
return InternalEngineTests.createInternalEngine((directory, writerConfig) ->
new IndexWriter(directory, writerConfig) {
@Override
public long addDocument(Iterable<? extends IndexableField> doc) throws IOException {
assert documentFailureMessage != null;
throw new IOException(documentFailureMessage);
}
}, null, null, config);
}
}
private static void assertNoOpTranslogOperationForDocumentFailure(
Iterable<IndexShard> replicationGroup,
int expectedOperation,
long expectedPrimaryTerm,
String failureMessage) throws IOException {
for (IndexShard indexShard : replicationGroup) {
try(Translog.Snapshot snapshot = getTranslog(indexShard).newSnapshot()) {
assertThat(snapshot.totalOperations(), equalTo(expectedOperation));
long expectedSeqNo = 0L;
Translog.Operation op = snapshot.next();
do {
assertThat(op.opType(), equalTo(Translog.Operation.Type.NO_OP));
assertThat(op.seqNo(), equalTo(expectedSeqNo));
assertThat(op.primaryTerm(), equalTo(expectedPrimaryTerm));
assertThat(((Translog.NoOp) op).reason(), containsString(failureMessage));
op = snapshot.next();
expectedSeqNo++;
} while (op != null);
}
}
}
}

View File

@ -37,6 +37,7 @@ import org.elasticsearch.action.resync.ResyncReplicationRequest;
import org.elasticsearch.action.resync.ResyncReplicationResponse;
import org.elasticsearch.action.resync.TransportResyncReplicationAction;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
import org.elasticsearch.action.support.replication.ReplicationOperation;
import org.elasticsearch.action.support.replication.ReplicationRequest;
@ -193,14 +194,23 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
}
public BulkItemResponse index(IndexRequest indexRequest) throws Exception {
return executeWriteRequest(indexRequest, indexRequest.getRefreshPolicy());
}
public BulkItemResponse delete(DeleteRequest deleteRequest) throws Exception {
return executeWriteRequest(deleteRequest, deleteRequest.getRefreshPolicy());
}
private BulkItemResponse executeWriteRequest(
DocWriteRequest<?> writeRequest, WriteRequest.RefreshPolicy refreshPolicy) throws Exception {
PlainActionFuture<BulkItemResponse> listener = new PlainActionFuture<>();
final ActionListener<BulkShardResponse> wrapBulkListener = ActionListener.wrap(
bulkShardResponse -> listener.onResponse(bulkShardResponse.getResponses()[0]),
listener::onFailure);
bulkShardResponse -> listener.onResponse(bulkShardResponse.getResponses()[0]),
listener::onFailure);
BulkItemRequest[] items = new BulkItemRequest[1];
items[0] = new BulkItemRequest(0, indexRequest);
BulkShardRequest request = new BulkShardRequest(shardId, indexRequest.getRefreshPolicy(), items);
new IndexingAction(request, wrapBulkListener, this).execute();
items[0] = new BulkItemRequest(0, writeRequest);
BulkShardRequest request = new BulkShardRequest(shardId, refreshPolicy, items);
new WriteReplicationAction(request, wrapBulkListener, this).execute();
return listener.get();
}
@ -598,9 +608,9 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
}
class IndexingAction extends ReplicationAction<BulkShardRequest, BulkShardRequest, BulkShardResponse> {
class WriteReplicationAction extends ReplicationAction<BulkShardRequest, BulkShardRequest, BulkShardResponse> {
IndexingAction(BulkShardRequest request, ActionListener<BulkShardResponse> listener, ReplicationGroup replicationGroup) {
WriteReplicationAction(BulkShardRequest request, ActionListener<BulkShardResponse> listener, ReplicationGroup replicationGroup) {
super(request, listener, replicationGroup, "indexing");
}