Don’t ack if unable to remove failing replica (#39584)

Today when a replicated write operation fails to execute on a replica,
the primary will reach out to the master to fail that replica (and mark
it stale). We then won't ack that request until the master removes the
failing replica; otherwise, we will lose the acked operation if the
failed replica is still in the in-sync set. However, if a node with the
primary is shutting down, we might ack such request even though we are
unable to send a shard-failure request to the master. This happens
because we ignore NodeClosedException which is triggered when the
ClusterService is being closed.

Closes #39467
This commit is contained in:
Nhat Nguyen 2019-03-05 11:22:03 -05:00
parent 3591da6ff8
commit 1fe7cb594f
10 changed files with 135 additions and 130 deletions

View File

@ -46,7 +46,6 @@ import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.Objects;
import java.util.function.Consumer;
public class TransportVerifyShardBeforeCloseAction extends TransportReplicationAction<
TransportVerifyShardBeforeCloseAction.ShardRequest, TransportVerifyShardBeforeCloseAction.ShardRequest, ReplicationResponse> {
@ -130,10 +129,8 @@ public class TransportVerifyShardBeforeCloseAction extends TransportReplicationA
}
@Override
public void markShardCopyAsStaleIfNeeded(final ShardId shardId, final String allocationId, final Runnable onSuccess,
final Consumer<Exception> onPrimaryDemoted, final Consumer<Exception> onIgnoredFailure) {
shardStateAction.remoteShardFailed(shardId, allocationId, primaryTerm, true, "mark copy as stale", null,
createShardActionListener(onSuccess, onPrimaryDemoted, onIgnoredFailure));
public void markShardCopyAsStaleIfNeeded(final ShardId shardId, final String allocationId, final ActionListener<Void> listener) {
shardStateAction.remoteShardFailed(shardId, allocationId, primaryTerm, true, "mark copy as stale", null, listener);
}
}

View File

@ -48,7 +48,6 @@ import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.function.Consumer;
import java.util.function.Supplier;
public class TransportResyncReplicationAction extends TransportWriteAction<ResyncReplicationRequest,
@ -210,10 +209,9 @@ public class TransportResyncReplicationAction extends TransportWriteAction<Resyn
}
@Override
public void failShardIfNeeded(ShardRouting replica, String message, Exception exception, Runnable onSuccess,
Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
shardStateAction.remoteShardFailed(replica.shardId(), replica.allocationId().getId(), primaryTerm, false, message, exception,
createShardActionListener(onSuccess, onPrimaryDemoted, onIgnoredFailure));
public void failShardIfNeeded(ShardRouting replica, String message, Exception exception, ActionListener<Void> listener) {
shardStateAction.remoteShardFailed(
replica.shardId(), replica.allocationId().getId(), primaryTerm, false, message, exception, listener);
}
}
}

View File

@ -21,12 +21,14 @@ package org.elasticsearch.action.support.replication;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.Assertions;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
@ -34,7 +36,9 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ReplicationGroup;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.transport.TransportException;
import java.io.IOException;
import java.util.ArrayList;
@ -43,7 +47,6 @@ import java.util.List;
import java.util.Locale;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
public class ReplicationOperation<
Request extends ReplicationRequest<Request>,
@ -133,10 +136,7 @@ public class ReplicationOperation<
for (String allocationId : replicationGroup.getUnavailableInSyncShards()) {
pendingActions.incrementAndGet();
replicasProxy.markShardCopyAsStaleIfNeeded(replicaRequest.shardId(), allocationId,
ReplicationOperation.this::decPendingAndFinishIfNeeded,
ReplicationOperation.this::onPrimaryDemoted,
throwable -> decPendingAndFinishIfNeeded()
);
ActionListener.wrap(r -> decPendingAndFinishIfNeeded(), ReplicationOperation.this::onNoLongerPrimary));
}
}
@ -192,9 +192,8 @@ public class ReplicationOperation<
shard.shardId(), shard.currentNodeId(), replicaException, restStatus, false));
}
String message = String.format(Locale.ROOT, "failed to perform %s on replica %s", opType, shard);
replicasProxy.failShardIfNeeded(shard, message,
replicaException, ReplicationOperation.this::decPendingAndFinishIfNeeded,
ReplicationOperation.this::onPrimaryDemoted, throwable -> decPendingAndFinishIfNeeded());
replicasProxy.failShardIfNeeded(shard, message, replicaException,
ActionListener.wrap(r -> decPendingAndFinishIfNeeded(), ReplicationOperation.this::onNoLongerPrimary));
}
@Override
@ -204,13 +203,26 @@ public class ReplicationOperation<
});
}
private void onPrimaryDemoted(Exception demotionFailure) {
String primaryFail = String.format(Locale.ROOT,
"primary shard [%s] was demoted while failing replica shard",
primary.routingEntry());
// we are no longer the primary, fail ourselves and start over
primary.failShard(primaryFail, demotionFailure);
finishAsFailed(new RetryOnPrimaryException(primary.routingEntry().shardId(), primaryFail, demotionFailure));
private void onNoLongerPrimary(Exception failure) {
final boolean nodeIsClosing = failure instanceof NodeClosedException ||
(failure instanceof TransportException && "TransportService is closed stopped can't send request".equals(failure.getMessage()));
final String message;
if (nodeIsClosing) {
message = String.format(Locale.ROOT,
"node with primary [%s] is shutting down while failing replica shard", primary.routingEntry());
// We prefer not to fail the primary to avoid unnecessary warning log
// when the node with the primary shard is gracefully shutting down.
} else {
if (Assertions.ENABLED) {
if (failure instanceof ShardStateAction.NoLongerPrimaryShardException == false) {
throw new AssertionError("unexpected failure", failure);
}
}
// we are no longer the primary, fail ourselves and start over
message = String.format(Locale.ROOT, "primary shard [%s] was demoted while failing replica shard", primary.routingEntry());
primary.failShard(message, failure);
}
finishAsFailed(new RetryOnPrimaryException(primary.routingEntry().shardId(), message, failure));
}
/**
@ -370,31 +382,23 @@ public class ReplicationOperation<
* of active shards. Whether a failure is needed is left up to the
* implementation.
*
* @param replica shard to fail
* @param message a (short) description of the reason
* @param exception the original exception which caused the ReplicationOperation to request the shard to be failed
* @param onSuccess a callback to call when the shard has been successfully removed from the active set.
* @param onPrimaryDemoted a callback to call when the shard can not be failed because the current primary has been demoted
* by the master.
* @param onIgnoredFailure a callback to call when failing a shard has failed, but it that failure can be safely ignored and the
* @param replica shard to fail
* @param message a (short) description of the reason
* @param exception the original exception which caused the ReplicationOperation to request the shard to be failed
* @param listener a listener that will be notified when the failing shard has been removed from the in-sync set
*/
void failShardIfNeeded(ShardRouting replica, String message, Exception exception, Runnable onSuccess,
Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure);
void failShardIfNeeded(ShardRouting replica, String message, Exception exception, ActionListener<Void> listener);
/**
* Marks shard copy as stale if needed, removing its allocation id from
* the set of in-sync allocation ids. Whether marking as stale is needed
* is left up to the implementation.
*
* @param shardId shard id
* @param allocationId allocation id to remove from the set of in-sync allocation ids
* @param onSuccess a callback to call when the allocation id has been successfully removed from the in-sync set.
* @param onPrimaryDemoted a callback to call when the request failed because the current primary was already demoted
* by the master.
* @param onIgnoredFailure a callback to call when the request failed, but the failure can be safely ignored.
* @param shardId shard id
* @param allocationId allocation id to remove from the set of in-sync allocation ids
* @param listener a listener that will be notified when the failing shard has been removed from the in-sync set
*/
void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, Runnable onSuccess,
Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure);
void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, ActionListener<Void> listener);
}
/**

View File

@ -84,7 +84,6 @@ import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Supplier;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
@ -1173,47 +1172,21 @@ public abstract class TransportReplicationAction<
}
@Override
public void failShardIfNeeded(ShardRouting replica, String message, Exception exception,
Runnable onSuccess, Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
public void failShardIfNeeded(ShardRouting replica, String message, Exception exception, ActionListener<Void> listener) {
// This does not need to fail the shard. The idea is that this
// is a non-write operation (something like a refresh or a global
// checkpoint sync) and therefore the replica should still be
// "alive" if it were to fail.
onSuccess.run();
listener.onResponse(null);
}
@Override
public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, Runnable onSuccess,
Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, ActionListener<Void> listener) {
// This does not need to make the shard stale. The idea is that this
// is a non-write operation (something like a refresh or a global
// checkpoint sync) and therefore the replica should still be
// "alive" if it were to be marked as stale.
onSuccess.run();
}
protected final ActionListener<Void> createShardActionListener(final Runnable onSuccess,
final Consumer<Exception> onPrimaryDemoted,
final Consumer<Exception> onIgnoredFailure) {
return new ActionListener<Void>() {
@Override
public void onResponse(Void aVoid) {
onSuccess.run();
}
@Override
public void onFailure(Exception shardFailedError) {
if (shardFailedError instanceof ShardStateAction.NoLongerPrimaryShardException) {
onPrimaryDemoted.accept(shardFailedError);
} else {
// these can occur if the node is shutting down and are okay
// any other exception here is not expected and merits investigation
assert shardFailedError instanceof TransportException ||
shardFailedError instanceof NodeClosedException : shardFailedError;
onIgnoredFailure.accept(shardFailedError);
}
}
};
listener.onResponse(null);
}
}

View File

@ -47,7 +47,6 @@ import org.elasticsearch.transport.TransportService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
/**
@ -376,20 +375,17 @@ public abstract class TransportWriteAction<
}
@Override
public void failShardIfNeeded(ShardRouting replica, String message, Exception exception,
Runnable onSuccess, Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
public void failShardIfNeeded(ShardRouting replica, String message, Exception exception, ActionListener<Void> listener) {
if (TransportActions.isShardNotAvailableException(exception) == false) {
logger.warn(new ParameterizedMessage("[{}] {}", replica.shardId(), message), exception);
}
shardStateAction.remoteShardFailed(replica.shardId(), replica.allocationId().getId(), primaryTerm, true, message, exception,
createShardActionListener(onSuccess, onPrimaryDemoted, onIgnoredFailure));
shardStateAction.remoteShardFailed(
replica.shardId(), replica.allocationId().getId(), primaryTerm, true, message, exception, listener);
}
@Override
public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, Runnable onSuccess,
Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
shardStateAction.remoteShardFailed(shardId, allocationId, primaryTerm, true, "mark copy as stale", null,
createShardActionListener(onSuccess, onPrimaryDemoted, onIgnoredFailure));
public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, ActionListener<Void> listener) {
shardStateAction.remoteShardFailed(shardId, allocationId, primaryTerm, true, "mark copy as stale", null, listener);
}
}
}

View File

@ -21,14 +21,16 @@ package org.elasticsearch.action.support.replication;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.replication.ReplicationResponse.ShardInfo;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
@ -39,7 +41,9 @@ import org.elasticsearch.index.shard.IndexShardNotStartedException;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ReplicationGroup;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.TransportException;
import java.util.ArrayList;
import java.util.Collections;
@ -51,7 +55,6 @@ import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.state;
@ -115,10 +118,8 @@ public class ReplicationOperationTests extends ESTestCase {
final TestReplicaProxy replicasProxy = new TestReplicaProxy(primaryTerm, simulatedFailures);
final TestPrimary primary = new TestPrimary(primaryShard, () -> replicationGroup);
final TestReplicationOperation op = new TestReplicationOperation(request,
primary, listener, replicasProxy);
final TestReplicationOperation op = new TestReplicationOperation(request, primary, listener, replicasProxy);
op.execute();
assertThat("request was not processed on primary", request.processedOnPrimary.get(), equalTo(true));
assertThat(request.processedOnReplicas, equalTo(expectedReplicas));
assertThat(replicasProxy.failedReplicas, equalTo(simulatedFailures.keySet()));
@ -162,7 +163,7 @@ public class ReplicationOperationTests extends ESTestCase {
}
}
public void testDemotedPrimary() throws Exception {
public void testNoLongerPrimary() throws Exception {
final String index = "test";
final ShardId shardId = new ShardId(index, "_na_", 0);
@ -198,26 +199,32 @@ public class ReplicationOperationTests extends ESTestCase {
Request request = new Request(shardId);
PlainActionFuture<TestPrimary.Result> listener = new PlainActionFuture<>();
final boolean testPrimaryDemotedOnStaleShardCopies = randomBoolean();
final Exception shardActionFailure;
if (randomBoolean()) {
shardActionFailure = new NodeClosedException(new DiscoveryNode("foo", buildNewFakeTransportAddress(), Version.CURRENT));
} else if (randomBoolean()) {
shardActionFailure = new TransportException("TransportService is closed stopped can't send request");
} else {
shardActionFailure = new ShardStateAction.NoLongerPrimaryShardException(failedReplica.shardId(), "the king is dead");
}
final TestReplicaProxy replicasProxy = new TestReplicaProxy(primaryTerm, expectedFailures) {
@Override
public void failShardIfNeeded(ShardRouting replica, String message, Exception exception,
Runnable onSuccess, Consumer<Exception> onPrimaryDemoted,
Consumer<Exception> onIgnoredFailure) {
ActionListener<Void> shardActionListener) {
if (testPrimaryDemotedOnStaleShardCopies) {
super.failShardIfNeeded(replica, message, exception, onSuccess, onPrimaryDemoted, onIgnoredFailure);
super.failShardIfNeeded(replica, message, exception, shardActionListener);
} else {
assertThat(replica, equalTo(failedReplica));
onPrimaryDemoted.accept(new ElasticsearchException("the king is dead"));
shardActionListener.onFailure(shardActionFailure);
}
}
@Override
public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, Runnable onSuccess,
Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, ActionListener<Void> shardActionListener) {
if (testPrimaryDemotedOnStaleShardCopies) {
onPrimaryDemoted.accept(new ElasticsearchException("the king is dead"));
shardActionListener.onFailure(shardActionFailure);
} else {
super.markShardCopyAsStaleIfNeeded(shardId, allocationId, onSuccess, onPrimaryDemoted, onIgnoredFailure);
super.markShardCopyAsStaleIfNeeded(shardId, allocationId, shardActionListener);
}
}
};
@ -225,6 +232,7 @@ public class ReplicationOperationTests extends ESTestCase {
final TestPrimary primary = new TestPrimary(primaryShard, () -> replicationGroup) {
@Override
public void failShard(String message, Exception exception) {
assertThat(exception, instanceOf(ShardStateAction.NoLongerPrimaryShardException.class));
assertTrue(primaryFailed.compareAndSet(false, true));
}
};
@ -233,7 +241,11 @@ public class ReplicationOperationTests extends ESTestCase {
assertThat("request was not processed on primary", request.processedOnPrimary.get(), equalTo(true));
assertTrue("listener is not marked as done", listener.isDone());
assertTrue(primaryFailed.get());
if (shardActionFailure instanceof ShardStateAction.NoLongerPrimaryShardException) {
assertTrue(primaryFailed.get());
} else {
assertFalse(primaryFailed.get());
}
assertListenerThrows("should throw exception to trigger retry", listener,
ReplicationOperation.RetryOnPrimaryException.class);
}
@ -594,33 +606,23 @@ public class ReplicationOperationTests extends ESTestCase {
}
@Override
public void failShardIfNeeded(ShardRouting replica, String message, Exception exception, Runnable onSuccess,
Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
public void failShardIfNeeded(ShardRouting replica, String message, Exception exception, ActionListener<Void> listener) {
if (failedReplicas.add(replica) == false) {
fail("replica [" + replica + "] was failed twice");
}
if (opFailures.containsKey(replica)) {
if (randomBoolean()) {
onSuccess.run();
} else {
onIgnoredFailure.accept(new ElasticsearchException("simulated"));
}
listener.onResponse(null);
} else {
fail("replica [" + replica + "] was failed");
}
}
@Override
public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, Runnable onSuccess,
Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, ActionListener<Void> listener) {
if (markedAsStaleCopies.add(allocationId) == false) {
fail("replica [" + allocationId + "] was marked as stale twice");
}
if (randomBoolean()) {
onSuccess.run();
} else {
onIgnoredFailure.accept(new ElasticsearchException("simulated"));
}
listener.onResponse(null);
}
}

View File

@ -744,11 +744,9 @@ public class TransportReplicationActionTests extends ESTestCase {
}
AtomicReference<Object> failure = new AtomicReference<>();
AtomicReference<Object> ignoredFailure = new AtomicReference<>();
AtomicBoolean success = new AtomicBoolean();
proxy.failShardIfNeeded(replica, "test", new ElasticsearchException("simulated"),
() -> success.set(true), failure::set, ignoredFailure::set
);
ActionListener.wrap(r -> success.set(true), failure::set));
CapturingTransport.CapturedRequest[] shardFailedRequests = transport.getCapturedRequestsAndClear();
// A replication action doesn't not fail the request
assertEquals(0, shardFailedRequests.length);

View File

@ -311,11 +311,9 @@ public class TransportWriteActionTests extends ESTestCase {
}
AtomicReference<Object> failure = new AtomicReference<>();
AtomicReference<Object> ignoredFailure = new AtomicReference<>();
AtomicBoolean success = new AtomicBoolean();
proxy.failShardIfNeeded(replica, "test", new ElasticsearchException("simulated"),
() -> success.set(true), failure::set, ignoredFailure::set
);
ActionListener.wrap(r -> success.set(true), failure::set));
CapturingTransport.CapturedRequest[] shardFailedRequests = transport.getCapturedRequestsAndClear();
// A write replication action proxy should fail the shard
assertEquals(1, shardFailedRequests.length);
@ -329,8 +327,6 @@ public class TransportWriteActionTests extends ESTestCase {
transport.handleResponse(shardFailedRequest.requestId, TransportResponse.Empty.INSTANCE);
assertTrue(success.get());
assertNull(failure.get());
assertNull(ignoredFailure.get());
} else if (randomBoolean()) {
// simulate the primary has been demoted
transport.handleRemoteError(shardFailedRequest.requestId,
@ -338,15 +334,12 @@ public class TransportWriteActionTests extends ESTestCase {
"shard-failed-test"));
assertFalse(success.get());
assertNotNull(failure.get());
assertNull(ignoredFailure.get());
} else {
// simulated an "ignored" exception
// simulated a node closing exception
transport.handleRemoteError(shardFailedRequest.requestId,
new NodeClosedException(state.nodes().getLocalNode()));
assertFalse(success.get());
assertNull(failure.get());
assertNotNull(ignoredFailure.get());
assertNotNull(failure.get());
}
}

View File

@ -37,8 +37,12 @@ import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTestCase;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.disruption.NetworkDisruption;
@ -52,6 +56,7 @@ import org.elasticsearch.test.junit.annotations.TestLogging;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
@ -66,6 +71,7 @@ import static org.elasticsearch.action.DocWriteResponse.Result.CREATED;
import static org.elasticsearch.action.DocWriteResponse.Result.UPDATED;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.isOneOf;
import static org.hamcrest.Matchers.not;
@ -436,4 +442,46 @@ public class ClusterDisruptionIT extends AbstractDisruptionTestCase {
assertFalse(client().admin().indices().prepareExists(idxName).get().isExists());
}
public void testRestartNodeWhileIndexing() throws Exception {
startCluster(3);
String index = "restart_while_indexing";
assertAcked(client().admin().indices().prepareCreate(index).setSettings(Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, between(1, 2))));
AtomicBoolean stopped = new AtomicBoolean();
Thread[] threads = new Thread[between(1, 4)];
AtomicInteger docID = new AtomicInteger();
Set<String> ackedDocs = ConcurrentCollections.newConcurrentSet();
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(() -> {
while (stopped.get() == false && docID.get() < 5000) {
String id = Integer.toString(docID.incrementAndGet());
try {
IndexResponse response = client().prepareIndex(index, "_doc", id).setSource("{}", XContentType.JSON).get();
assertThat(response.getResult(), isOneOf(CREATED, UPDATED));
logger.info("--> index id={} seq_no={}", response.getId(), response.getSeqNo());
ackedDocs.add(response.getId());
} catch (ElasticsearchException ignore) {
logger.info("--> fail to index id={}", id);
}
}
});
threads[i].start();
}
ensureGreen(index);
assertBusy(() -> assertThat(docID.get(), greaterThanOrEqualTo(100)));
internalCluster().restartRandomDataNode(new InternalTestCluster.RestartCallback());
ensureGreen(index);
assertBusy(() -> assertThat(docID.get(), greaterThanOrEqualTo(200)));
stopped.set(true);
for (Thread thread : threads) {
thread.join();
}
ClusterState clusterState = internalCluster().clusterService().state();
for (ShardRouting shardRouting : clusterState.routingTable().allShards(index)) {
String nodeName = clusterState.nodes().get(shardRouting.currentNodeId()).getName();
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodeName);
IndexShard indexShard = indicesService.getShardOrNull(shardRouting.shardId());
assertThat(IndexShardTestCase.getShardDocUIDs(indexShard), equalTo(ackedDocs));
}
}
}

View File

@ -96,7 +96,6 @@ import java.util.concurrent.FutureTask;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
@ -721,15 +720,12 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
}
@Override
public void failShardIfNeeded(ShardRouting replica, String message, Exception exception,
Runnable onSuccess, Consumer<Exception> onPrimaryDemoted,
Consumer<Exception> onIgnoredFailure) {
public void failShardIfNeeded(ShardRouting replica, String message, Exception exception, ActionListener<Void> listener) {
throw new UnsupportedOperationException("failing shard " + replica + " isn't supported. failure: " + message, exception);
}
@Override
public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, Runnable onSuccess,
Consumer<Exception> onPrimaryDemoted, Consumer<Exception> onIgnoredFailure) {
public void markShardCopyAsStaleIfNeeded(ShardId shardId, String allocationId, ActionListener<Void> listener) {
throw new UnsupportedOperationException("can't mark " + shardId + ", aid [" + allocationId + "] as stale");
}
}