diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index 89026d9d1db..f91fab381d3 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -21,6 +21,7 @@ package org.elasticsearch.action.support.replication; import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.UnavailableShardsException; import org.elasticsearch.action.admin.indices.close.CloseIndexRequest; @@ -49,10 +50,14 @@ import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexService; @@ -63,12 +68,16 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardNotFoundException; import org.elasticsearch.indices.IndexClosedException; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.indices.cluster.ClusterStateChanges; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.transport.CapturingTransport; +import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.MockTcpTransport; +import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; @@ -82,10 +91,13 @@ import org.junit.Before; import org.junit.BeforeClass; import java.io.IOException; +import java.net.UnknownHostException; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Locale; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -918,6 +930,61 @@ public class TransportReplicationActionTests extends ESTestCase { assertConcreteShardRequest(capturedRequest.request, request, replica.allocationId()); } + public void testRetryOnReplicaWithRealTransport() throws Exception { + final ShardId shardId = new ShardId("test", "_na_", 0); + final ClusterState initialState = state(shardId.getIndexName(), true, ShardRoutingState.STARTED, ShardRoutingState.STARTED); + final ShardRouting replica = initialState.getRoutingTable().shardRoutingTable(shardId).replicaShards().get(0); + // simulate execution of the node holding the replica + final ClusterState stateWithNodes = ClusterState.builder(initialState) + .nodes(DiscoveryNodes.builder(initialState.nodes()).localNodeId(replica.currentNodeId())).build(); + setState(clusterService, stateWithNodes); + AtomicBoolean throwException = new AtomicBoolean(true); + final ReplicationTask task = maybeTask(); + NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList()); + final Transport transport = new MockTcpTransport(Settings.EMPTY, threadPool, BigArrays.NON_RECYCLING_INSTANCE, + new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(Settings.EMPTY, Collections.emptyList()), + Version.CURRENT); + transportService = new MockTransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, + x -> clusterService.localNode(),null); + transportService.start(); + transportService.acceptIncomingRequests(); + + AtomicBoolean calledSuccessfully = new AtomicBoolean(false); + TestAction action = new TestAction(Settings.EMPTY, "testActionWithExceptions", transportService, clusterService, shardStateAction, + threadPool) { + @Override + protected ReplicaResult shardOperationOnReplica(Request request, IndexShard replica) { + assertPhase(task, "replica"); + if (throwException.get()) { + throw new RetryOnReplicaException(shardId, "simulation"); + } + calledSuccessfully.set(true); + return new ReplicaResult(); + } + }; + final TestAction.ReplicaOperationTransportHandler replicaOperationTransportHandler = action.new ReplicaOperationTransportHandler(); + final PlainActionFuture listener = new PlainActionFuture<>(); + final Request request = new Request().setShardId(shardId); + final long checkpoint = randomNonNegativeLong(); + request.primaryTerm(stateWithNodes.metaData().getIndexSafe(shardId.getIndex()).primaryTerm(shardId.id())); + replicaOperationTransportHandler.messageReceived( + new TransportReplicationAction.ConcreteReplicaRequest<>(request, replica.allocationId().getId(), checkpoint), + createTransportChannel(listener), task); + if (listener.isDone()) { + listener.get(); // fail with the exception if there + fail("listener shouldn't be done"); + } + + // release the waiting + throwException.set(false); + // publish a new state (same as the old state with the version incremented) + setState(clusterService, stateWithNodes); + + // Assert that the request was retried, this time successfull + assertTrue("action should have been successfully called on retry but was not", calledSuccessfully.get()); + transportService.stop(); + } + private void assertConcreteShardRequest(TransportRequest capturedRequest, Request expectedRequest, AllocationId expectedAllocationId) { final TransportReplicationAction.ConcreteShardRequest concreteShardRequest = (TransportReplicationAction.ConcreteShardRequest) capturedRequest;