[TEST] Add test for retrying replica operations with real network

Related to #24745
This commit is contained in:
Lee Hinman 2017-05-17 16:17:33 -06:00
parent a64937db7a
commit 7a6db074ee
1 changed files with 67 additions and 0 deletions

View File

@ -21,6 +21,7 @@ package org.elasticsearch.action.support.replication;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
@ -49,10 +50,14 @@ import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexService;
@ -63,12 +68,16 @@ import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.indices.IndexClosedException;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.indices.cluster.ClusterStateChanges;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.CapturingTransport;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.MockTcpTransport;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
@ -82,10 +91,13 @@ import org.junit.Before;
import org.junit.BeforeClass;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -918,6 +930,61 @@ public class TransportReplicationActionTests extends ESTestCase {
assertConcreteShardRequest(capturedRequest.request, request, replica.allocationId());
}
public void testRetryOnReplicaWithRealTransport() throws Exception {
final ShardId shardId = new ShardId("test", "_na_", 0);
final ClusterState initialState = state(shardId.getIndexName(), true, ShardRoutingState.STARTED, ShardRoutingState.STARTED);
final ShardRouting replica = initialState.getRoutingTable().shardRoutingTable(shardId).replicaShards().get(0);
// simulate execution of the node holding the replica
final ClusterState stateWithNodes = ClusterState.builder(initialState)
.nodes(DiscoveryNodes.builder(initialState.nodes()).localNodeId(replica.currentNodeId())).build();
setState(clusterService, stateWithNodes);
AtomicBoolean throwException = new AtomicBoolean(true);
final ReplicationTask task = maybeTask();
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList());
final Transport transport = new MockTcpTransport(Settings.EMPTY, threadPool, BigArrays.NON_RECYCLING_INSTANCE,
new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(Settings.EMPTY, Collections.emptyList()),
Version.CURRENT);
transportService = new MockTransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
x -> clusterService.localNode(),null);
transportService.start();
transportService.acceptIncomingRequests();
AtomicBoolean calledSuccessfully = new AtomicBoolean(false);
TestAction action = new TestAction(Settings.EMPTY, "testActionWithExceptions", transportService, clusterService, shardStateAction,
threadPool) {
@Override
protected ReplicaResult shardOperationOnReplica(Request request, IndexShard replica) {
assertPhase(task, "replica");
if (throwException.get()) {
throw new RetryOnReplicaException(shardId, "simulation");
}
calledSuccessfully.set(true);
return new ReplicaResult();
}
};
final TestAction.ReplicaOperationTransportHandler replicaOperationTransportHandler = action.new ReplicaOperationTransportHandler();
final PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
final Request request = new Request().setShardId(shardId);
final long checkpoint = randomNonNegativeLong();
request.primaryTerm(stateWithNodes.metaData().getIndexSafe(shardId.getIndex()).primaryTerm(shardId.id()));
replicaOperationTransportHandler.messageReceived(
new TransportReplicationAction.ConcreteReplicaRequest<>(request, replica.allocationId().getId(), checkpoint),
createTransportChannel(listener), task);
if (listener.isDone()) {
listener.get(); // fail with the exception if there
fail("listener shouldn't be done");
}
// release the waiting
throwException.set(false);
// publish a new state (same as the old state with the version incremented)
setState(clusterService, stateWithNodes);
// Assert that the request was retried, this time successfull
assertTrue("action should have been successfully called on retry but was not", calledSuccessfully.get());
transportService.stop();
}
private void assertConcreteShardRequest(TransportRequest capturedRequest, Request expectedRequest, AllocationId expectedAllocationId) {
final TransportReplicationAction.ConcreteShardRequest<?> concreteShardRequest =
(TransportReplicationAction.ConcreteShardRequest<?>) capturedRequest;