diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 38972a7f774..e030fa4f151 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -235,39 +235,9 @@ public abstract class TransportReplicationAction< return TransportRequestOptions.EMPTY; } - private String concreteIndex(final ClusterState state, final ReplicationRequest request) { - return resolveIndex() ? indexNameExpressionResolver.concreteSingleIndex(state, request).getName() : request.index(); - } - - private ClusterBlockException blockExceptions(final ClusterState state, final String indexName) { - ClusterBlockLevel globalBlockLevel = globalBlockLevel(); - if (globalBlockLevel != null) { - ClusterBlockException blockException = state.blocks().globalBlockedException(globalBlockLevel); - if (blockException != null) { - return blockException; - } - } - ClusterBlockLevel indexBlockLevel = indexBlockLevel(); - if (indexBlockLevel != null) { - ClusterBlockException blockException = state.blocks().indexBlockedException(indexBlockLevel, indexName); - if (blockException != null) { - return blockException; - } - } - return null; - } - protected boolean retryPrimaryException(final Throwable e) { return e.getClass() == ReplicationOperation.RetryOnPrimaryException.class - || TransportActions.isShardNotAvailableException(e) - || isRetryableClusterBlockException(e); - } - - boolean isRetryableClusterBlockException(final Throwable e) { - if (e instanceof ClusterBlockException) { - return ((ClusterBlockException) e).retryable(); - } - return false; + || TransportActions.isShardNotAvailableException(e); } protected class OperationTransportHandler implements TransportRequestHandler { @@ -340,15 +310,6 @@ public abstract class TransportReplicationAction< @Override public void onResponse(PrimaryShardReference primaryShardReference) { try { - final ClusterState clusterState = clusterService.state(); - final IndexMetaData indexMetaData = clusterState.metaData().getIndexSafe(primaryShardReference.routingEntry().index()); - - final ClusterBlockException blockException = blockExceptions(clusterState, indexMetaData.getIndex().getName()); - if (blockException != null) { - logger.trace("cluster is blocked, action failed on primary", blockException); - throw blockException; - } - if (primaryShardReference.isRelocated()) { primaryShardReference.close(); // release shard operation lock as soon as possible setPhase(replicationTask, "primary_delegation"); @@ -362,7 +323,7 @@ public abstract class TransportReplicationAction< response.readFrom(in); return response; }; - DiscoveryNode relocatingNode = clusterState.nodes().get(primary.relocatingNodeId()); + DiscoveryNode relocatingNode = clusterService.state().nodes().get(primary.relocatingNodeId()); transportService.sendRequest(relocatingNode, transportPrimaryAction, new ConcreteShardRequest<>(request, primary.allocationId().getRelocationId(), primaryTerm), transportOptions, @@ -735,42 +696,35 @@ public abstract class TransportReplicationAction< protected void doRun() { setPhase(task, "routing"); final ClusterState state = observer.setAndGetObservedState(); - final String concreteIndex = concreteIndex(state, request); - final ClusterBlockException blockException = blockExceptions(state, concreteIndex); - if (blockException != null) { - if (blockException.retryable()) { - logger.trace("cluster is blocked, scheduling a retry", blockException); - retry(blockException); - } else { - finishAsFailed(blockException); - } + if (handleBlockExceptions(state)) { + return; + } + + // request does not have a shardId yet, we need to pass the concrete index to resolve shardId + final String concreteIndex = concreteIndex(state); + final IndexMetaData indexMetaData = state.metaData().index(concreteIndex); + if (indexMetaData == null) { + retry(new IndexNotFoundException(concreteIndex)); + return; + } + if (indexMetaData.getState() == IndexMetaData.State.CLOSE) { + throw new IndexClosedException(indexMetaData.getIndex()); + } + + // resolve all derived request fields, so we can route and apply it + resolveRequest(indexMetaData, request); + assert request.shardId() != null : "request shardId must be set in resolveRequest"; + assert request.waitForActiveShards() != ActiveShardCount.DEFAULT : "request waitForActiveShards must be set in resolveRequest"; + + final ShardRouting primary = primary(state); + if (retryIfUnavailable(state, primary)) { + return; + } + final DiscoveryNode node = state.nodes().get(primary.currentNodeId()); + if (primary.currentNodeId().equals(state.nodes().getLocalNodeId())) { + performLocalAction(state, primary, node, indexMetaData); } else { - // request does not have a shardId yet, we need to pass the concrete index to resolve shardId - final IndexMetaData indexMetaData = state.metaData().index(concreteIndex); - if (indexMetaData == null) { - retry(new IndexNotFoundException(concreteIndex)); - return; - } - if (indexMetaData.getState() == IndexMetaData.State.CLOSE) { - throw new IndexClosedException(indexMetaData.getIndex()); - } - - // resolve all derived request fields, so we can route and apply it - resolveRequest(indexMetaData, request); - assert request.shardId() != null : "request shardId must be set in resolveRequest"; - assert request.waitForActiveShards() != ActiveShardCount.DEFAULT : - "request waitForActiveShards must be set in resolveRequest"; - - final ShardRouting primary = primary(state); - if (retryIfUnavailable(state, primary)) { - return; - } - final DiscoveryNode node = state.nodes().get(primary.currentNodeId()); - if (primary.currentNodeId().equals(state.nodes().getLocalNodeId())) { - performLocalAction(state, primary, node, indexMetaData); - } else { - performRemoteAction(state, primary, node); - } + performRemoteAction(state, primary, node); } } @@ -822,11 +776,44 @@ public abstract class TransportReplicationAction< return false; } + private String concreteIndex(ClusterState state) { + return resolveIndex() ? indexNameExpressionResolver.concreteSingleIndex(state, request).getName() : request.index(); + } + private ShardRouting primary(ClusterState state) { IndexShardRoutingTable indexShard = state.getRoutingTable().shardRoutingTable(request.shardId()); return indexShard.primaryShard(); } + private boolean handleBlockExceptions(ClusterState state) { + ClusterBlockLevel globalBlockLevel = globalBlockLevel(); + if (globalBlockLevel != null) { + ClusterBlockException blockException = state.blocks().globalBlockedException(globalBlockLevel); + if (blockException != null) { + handleBlockException(blockException); + return true; + } + } + ClusterBlockLevel indexBlockLevel = indexBlockLevel(); + if (indexBlockLevel != null) { + ClusterBlockException blockException = state.blocks().indexBlockedException(indexBlockLevel, concreteIndex(state)); + if (blockException != null) { + handleBlockException(blockException); + return true; + } + } + return false; + } + + private void handleBlockException(ClusterBlockException blockException) { + if (blockException.retryable()) { + logger.trace("cluster is blocked, scheduling a retry", blockException); + retry(blockException); + } else { + finishAsFailed(blockException); + } + } + private void performAction(final DiscoveryNode node, final String action, final boolean isPrimaryAction, final TransportRequest requestToPerform) { transportService.sendRequest(node, action, requestToPerform, transportOptions, new TransportResponseHandler() { diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index c8c40a7f584..aeda5f1c3fa 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -89,7 +89,6 @@ import org.junit.BeforeClass; import java.io.IOException; import java.util.Collections; -import java.util.EnumSet; import java.util.HashSet; import java.util.List; import java.util.Locale; @@ -101,7 +100,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; -import static java.util.Collections.singleton; import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.state; import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithActivePrimary; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_WAIT_FOR_ACTIVE_SHARDS; @@ -110,11 +108,9 @@ import static org.elasticsearch.test.ClusterServiceUtils.setState; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.Matchers.arrayWithSize; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.hasToString; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; -import static org.hamcrest.core.Is.is; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyLong; @@ -186,157 +182,70 @@ public class TransportReplicationActionTests extends ESTestCase { threadPool = null; } - private T assertListenerThrows(String msg, PlainActionFuture listener, Class klass) { - ExecutionException exception = expectThrows(ExecutionException.class, msg, listener::get); - assertThat(exception.getCause(), instanceOf(klass)); - @SuppressWarnings("unchecked") - final T cause = (T) exception.getCause(); - return cause; - } - - private void setStateWithBlock(final ClusterService clusterService, final ClusterBlock block, final boolean globalBlock) { - final ClusterBlocks.Builder blocks = ClusterBlocks.builder(); - if (globalBlock) { - blocks.addGlobalBlock(block); - } else { - blocks.addIndexBlock("index", block); + void assertListenerThrows(String msg, PlainActionFuture listener, Class klass) throws InterruptedException { + try { + listener.get(); + fail(msg); + } catch (ExecutionException ex) { + assertThat(ex.getCause(), instanceOf(klass)); } - setState(clusterService, ClusterState.builder(clusterService.state()).blocks(blocks).build()); } - public void testBlocksInReroutePhase() throws Exception { - final ClusterBlock nonRetryableBlock = - new ClusterBlock(1, "non retryable", false, true, false, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL); - final ClusterBlock retryableBlock = - new ClusterBlock(1, "retryable", true, true, false, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL); - - final boolean globalBlock = randomBoolean(); - final TestAction action = new TestAction(Settings.EMPTY, "internal:testActionWithBlocks", - transportService, clusterService, shardStateAction, threadPool) { + public void testBlocks() throws ExecutionException, InterruptedException { + Request request = new Request(); + PlainActionFuture listener = new PlainActionFuture<>(); + ReplicationTask task = maybeTask(); + TestAction action = new TestAction(Settings.EMPTY, "internal:testActionWithBlocks", + transportService, clusterService, shardStateAction, threadPool) { @Override protected ClusterBlockLevel globalBlockLevel() { - return globalBlock ? ClusterBlockLevel.WRITE : null; - } - - @Override - protected ClusterBlockLevel indexBlockLevel() { - return globalBlock == false ? ClusterBlockLevel.WRITE : null; + return ClusterBlockLevel.WRITE; } }; - setState(clusterService, ClusterStateCreationUtils.stateWithActivePrimary("index", true, 0)); - - { - setStateWithBlock(clusterService, nonRetryableBlock, globalBlock); - - Request request = globalBlock ? new Request() : new Request().index("index"); - PlainActionFuture listener = new PlainActionFuture<>(); - ReplicationTask task = maybeTask(); - - TestAction.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener); - reroutePhase.run(); - - ClusterBlockException exception = - assertListenerThrows("primary action should fail operation", listener, ClusterBlockException.class); - assertThat(((ClusterBlockException) exception.unwrapCause()).blocks().iterator().next(), is(nonRetryableBlock)); - assertPhase(task, "failed"); - } - { - setStateWithBlock(clusterService, retryableBlock, globalBlock); - - Request requestWithTimeout = (globalBlock ? new Request() : new Request().index("index")).timeout("5ms"); - PlainActionFuture listener = new PlainActionFuture<>(); - ReplicationTask task = maybeTask(); - - TestAction.ReroutePhase reroutePhase = action.new ReroutePhase(task, requestWithTimeout, listener); - reroutePhase.run(); - - ClusterBlockException exception = - assertListenerThrows("failed to timeout on retryable block", listener, ClusterBlockException.class); - assertThat(((ClusterBlockException) exception.unwrapCause()).blocks().iterator().next(), is(retryableBlock)); - assertPhase(task, "failed"); - assertTrue(requestWithTimeout.isRetrySet.get()); - } - { - setStateWithBlock(clusterService, retryableBlock, globalBlock); - - Request request = globalBlock ? new Request() : new Request().index("index"); - PlainActionFuture listener = new PlainActionFuture<>(); - ReplicationTask task = maybeTask(); - - TestAction.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener); - reroutePhase.run(); - - assertFalse("primary phase should wait on retryable block", listener.isDone()); - assertPhase(task, "waiting_for_retry"); - assertTrue(request.isRetrySet.get()); - - setStateWithBlock(clusterService, nonRetryableBlock, globalBlock); - - ClusterBlockException exception = assertListenerThrows("primary phase should fail operation when moving from a retryable " + - "block to a non-retryable one", listener, ClusterBlockException.class); - assertThat(((ClusterBlockException) exception.unwrapCause()).blocks().iterator().next(), is(nonRetryableBlock)); - assertIndexShardUninitialized(); - } - { - Request requestWithTimeout = new Request().index("unknown").setShardId(new ShardId("unknown", "_na_", 0)).timeout("5ms"); - PlainActionFuture listener = new PlainActionFuture<>(); - ReplicationTask task = maybeTask(); - - TestAction testActionWithNoBlocks = new TestAction(Settings.EMPTY, "internal:testActionWithNoBlocks", transportService, - clusterService, shardStateAction, threadPool); - listener = new PlainActionFuture<>(); - TestAction.ReroutePhase reroutePhase = testActionWithNoBlocks.new ReroutePhase(task, requestWithTimeout, listener); - reroutePhase.run(); - assertListenerThrows("should fail with an IndexNotFoundException when no blocks", listener, IndexNotFoundException.class); - } - } - - public void testBlocksInPrimaryAction() { - final boolean globalBlock = randomBoolean(); - - final TestAction actionWithBlocks = - new TestAction(Settings.EMPTY, "internal:actionWithBlocks", transportService, clusterService, shardStateAction, threadPool) { - @Override - protected ClusterBlockLevel globalBlockLevel() { - return globalBlock ? ClusterBlockLevel.WRITE : null; - } - - @Override - protected ClusterBlockLevel indexBlockLevel() { - return globalBlock == false ? ClusterBlockLevel.WRITE : null; - } - }; - - final String index = "index"; - final ShardId shardId = new ShardId(index, "_na_", 0); - setState(clusterService, stateWithActivePrimary(index, true, randomInt(5))); - - final ClusterBlocks.Builder block = ClusterBlocks.builder(); - if (globalBlock) { - block.addGlobalBlock(new ClusterBlock(randomIntBetween(1, 16), "test global block", randomBoolean(), randomBoolean(), - randomBoolean(), RestStatus.BAD_REQUEST, ClusterBlockLevel.ALL)); - } else { - block.addIndexBlock(index, new ClusterBlock(randomIntBetween(1, 16), "test index block", randomBoolean(), randomBoolean(), - randomBoolean(), RestStatus.FORBIDDEN, ClusterBlockLevel.READ_WRITE)); - } + ClusterBlocks.Builder block = ClusterBlocks.builder().addGlobalBlock(new ClusterBlock(1, "non retryable", false, true, + false, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL)); setState(clusterService, ClusterState.builder(clusterService.state()).blocks(block)); + TestAction.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener); + reroutePhase.run(); + assertListenerThrows("primary phase should fail operation", listener, ClusterBlockException.class); + assertPhase(task, "failed"); - final ClusterState clusterState = clusterService.state(); - final String targetAllocationID = clusterState.getRoutingTable().shardRoutingTable(shardId).primaryShard().allocationId().getId(); - final long primaryTerm = clusterState.metaData().index(index).primaryTerm(shardId.id()); - final Request request = new Request(shardId); - final ReplicationTask task = maybeTask(); - final PlainActionFuture listener = new PlainActionFuture<>(); + block = ClusterBlocks.builder() + .addGlobalBlock(new ClusterBlock(1, "retryable", true, true, false, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL)); + setState(clusterService, ClusterState.builder(clusterService.state()).blocks(block)); + listener = new PlainActionFuture<>(); + reroutePhase = action.new ReroutePhase(task, new Request().timeout("5ms"), listener); + reroutePhase.run(); + assertListenerThrows("failed to timeout on retryable block", listener, ClusterBlockException.class); + assertPhase(task, "failed"); + assertFalse(request.isRetrySet.get()); - final TransportReplicationAction.AsyncPrimaryAction asyncPrimaryActionWithBlocks = - actionWithBlocks.new AsyncPrimaryAction(request, targetAllocationID, primaryTerm, createTransportChannel(listener), task); - asyncPrimaryActionWithBlocks.run(); + listener = new PlainActionFuture<>(); + reroutePhase = action.new ReroutePhase(task, request = new Request(), listener); + reroutePhase.run(); + assertFalse("primary phase should wait on retryable block", listener.isDone()); + assertPhase(task, "waiting_for_retry"); + assertTrue(request.isRetrySet.get()); - final ExecutionException exception = expectThrows(ExecutionException.class, listener::get); - assertThat(exception.getCause(), instanceOf(ClusterBlockException.class)); - assertThat(exception.getCause(), hasToString(containsString("test " + (globalBlock ? "global" : "index") + " block"))); - assertPhase(task, "finished"); + block = ClusterBlocks.builder().addGlobalBlock(new ClusterBlock(1, "non retryable", false, true, false, + RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL)); + setState(clusterService, ClusterState.builder(clusterService.state()).blocks(block)); + assertListenerThrows("primary phase should fail operation when moving from a retryable block to a non-retryable one", listener, + ClusterBlockException.class); + assertIndexShardUninitialized(); + + action = new TestAction(Settings.EMPTY, "internal:testActionWithNoBlocks", transportService, clusterService, shardStateAction, + threadPool) { + @Override + protected ClusterBlockLevel globalBlockLevel() { + return null; + } + }; + listener = new PlainActionFuture<>(); + reroutePhase = action.new ReroutePhase(task, new Request().timeout("5ms"), listener); + reroutePhase.run(); + assertListenerThrows("should fail with an IndexNotFoundException when no blocks checked", listener, IndexNotFoundException.class); } public void assertIndexShardUninitialized() { @@ -468,12 +377,21 @@ public class TransportReplicationActionTests extends ESTestCase { PlainActionFuture listener = new PlainActionFuture<>(); ReplicationTask task = maybeTask(); + ClusterBlockLevel indexBlockLevel = randomBoolean() ? ClusterBlockLevel.WRITE : null; TestAction action = new TestAction(Settings.EMPTY, "internal:testActionWithBlocks", transportService, - clusterService, shardStateAction, threadPool); + clusterService, shardStateAction, threadPool) { + @Override + protected ClusterBlockLevel indexBlockLevel() { + return indexBlockLevel; + } + }; TestAction.ReroutePhase reroutePhase = action.new ReroutePhase(task, request, listener); reroutePhase.run(); - assertListenerThrows("must throw index closed exception", listener, IndexClosedException.class); - + if (indexBlockLevel == ClusterBlockLevel.WRITE) { + assertListenerThrows("must throw block exception", listener, ClusterBlockException.class); + } else { + assertListenerThrows("must throw index closed exception", listener, IndexClosedException.class); + } assertPhase(task, "failed"); assertFalse(request.isRetrySet.get()); } @@ -764,12 +682,12 @@ public class TransportReplicationActionTests extends ESTestCase { PlainActionFuture listener = new PlainActionFuture<>(); - final IndexShard shard = mockIndexShard(shardId, clusterService); + final IndexShard shard = mock(IndexShard.class); when(shard.getPendingPrimaryTerm()).thenReturn(primaryTerm); when(shard.routingEntry()).thenReturn(routingEntry); when(shard.isRelocatedPrimary()).thenReturn(false); IndexShardRoutingTable shardRoutingTable = clusterService.state().routingTable().shardRoutingTable(shardId); - Set inSyncIds = randomBoolean() ? singleton(routingEntry.allocationId().getId()) : + Set inSyncIds = randomBoolean() ? Collections.singleton(routingEntry.allocationId().getId()) : clusterService.state().metaData().index(index).inSyncAllocationIds(0); when(shard.getReplicationGroup()).thenReturn( new ReplicationGroup(shardRoutingTable, @@ -1104,17 +1022,6 @@ public class TransportReplicationActionTests extends ESTestCase { transportService.stop(); } - public void testIsRetryableClusterBlockException() { - final TestAction action = new TestAction(Settings.EMPTY, "internal:testIsRetryableClusterBlockException", transportService, - clusterService, shardStateAction, threadPool); - assertFalse(action.isRetryableClusterBlockException(randomRetryPrimaryException(new ShardId("index", "_na_", 0)))); - - final boolean retryable = randomBoolean(); - ClusterBlock randomBlock = new ClusterBlock(randomIntBetween(1, 16), "test", retryable, randomBoolean(), - randomBoolean(), randomFrom(RestStatus.values()), EnumSet.of(randomFrom(ClusterBlockLevel.values()))); - assertEquals(retryable, action.isRetryableClusterBlockException(new ClusterBlockException(singleton(randomBlock)))); - } - private void assertConcreteShardRequest(TransportRequest capturedRequest, Request expectedRequest, AllocationId expectedAllocationId) { final TransportReplicationAction.ConcreteShardRequest concreteShardRequest = (TransportReplicationAction.ConcreteShardRequest) capturedRequest; @@ -1208,6 +1115,15 @@ public class TransportReplicationActionTests extends ESTestCase { Request::new, Request::new, ThreadPool.Names.SAME); } + TestAction(Settings settings, String actionName, TransportService transportService, + ClusterService clusterService, ShardStateAction shardStateAction, + ThreadPool threadPool, boolean withDocumentFailureOnPrimary, boolean withDocumentFailureOnReplica) { + super(settings, actionName, transportService, clusterService, mockIndicesService(clusterService), threadPool, + shardStateAction, + new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(), + Request::new, Request::new, ThreadPool.Names.SAME); + } + @Override protected TestResponse newResponseInstance() { return new TestResponse(); @@ -1267,7 +1183,6 @@ public class TransportReplicationActionTests extends ESTestCase { private IndexShard mockIndexShard(ShardId shardId, ClusterService clusterService) { final IndexShard indexShard = mock(IndexShard.class); - when(indexShard.shardId()).thenReturn(shardId); doAnswer(invocation -> { ActionListener callback = (ActionListener) invocation.getArguments()[0]; count.incrementAndGet();