From 7bf83ff924bf7a3102aeb66780800035954e3d01 Mon Sep 17 00:00:00 2001 From: Britta Weber Date: Fri, 24 Apr 2015 16:23:51 +0200 Subject: [PATCH] ref count write operations on IndexShard This commit adds a counter for IndexShard that keeps track of how many write operations are currently in flight on a shard. The counter is incremented whenever a write request is submitted in TransportShardReplicationOperationAction and decremented when it is finished. On a primary it stays incremented while replicas are being processed. The counter is an instance of AbstractRefCounted. Once this counter reaches 0 each write operation will be rejected with an IndexClosedException. closes #10610 --- ...nsportShardReplicationOperationAction.java | 110 +++++-- .../util/concurrent/AbstractRefCounted.java | 6 +- .../elasticsearch/index/shard/IndexShard.java | 75 ++++- .../ShardReplicationOperationTests.java | 300 +++++++++++++++++- .../index/shard/IndexShardTests.java | 56 +++- .../test/ElasticsearchIntegrationTest.java | 1 + .../test/InternalTestCluster.java | 52 +-- .../org/elasticsearch/test/TestCluster.java | 6 + 8 files changed, 516 insertions(+), 90 deletions(-) diff --git a/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java b/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java index efdc5ab0d4c..c45a3798318 100644 --- a/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java +++ b/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java @@ -19,7 +19,7 @@ package org.elasticsearch.action.support.replication; -import org.elasticsearch.ElasticsearchException; +import org.apache.lucene.util.IOUtils; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionWriteResponse; @@ -36,17 +36,16 @@ import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.routing.IndexRoutingTable; -import org.elasticsearch.cluster.routing.IndexShardRoutingTable; -import org.elasticsearch.cluster.routing.ShardIterator; -import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.*; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.common.util.concurrent.RefCounted; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.engine.DocumentAlreadyExistsException; import org.elasticsearch.index.engine.VersionConflictEngineException; @@ -57,21 +56,15 @@ import org.elasticsearch.indices.IndicesService; import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.BaseTransportResponseHandler; -import org.elasticsearch.transport.ConnectTransportException; -import org.elasticsearch.transport.EmptyTransportResponseHandler; -import org.elasticsearch.transport.TransportChannel; -import org.elasticsearch.transport.TransportException; -import org.elasticsearch.transport.TransportRequestHandler; -import org.elasticsearch.transport.TransportRequestOptions; -import org.elasticsearch.transport.TransportResponse; -import org.elasticsearch.transport.TransportService; +import org.elasticsearch.transport.*; +import java.io.Closeable; import java.io.IOException; import java.util.Map; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; /** */ @@ -103,7 +96,7 @@ public abstract class TransportShardReplicationOperationAction listener) { @@ -394,7 +393,9 @@ public abstract class TransportShardReplicationOperationAction primaryResponse = shardOperationOnPrimary(observer.observedState(), por); logger.trace("operation completed on primary [{}]", primary); - replicationPhase = new ReplicationPhase(shardsIt, primaryResponse.v2(), primaryResponse.v1(), observer, primary, internalRequest, listener); + replicationPhase = new ReplicationPhase(shardsIt, primaryResponse.v2(), primaryResponse.v1(), observer, primary, internalRequest, listener, indexShardReference); } catch (Throwable e) { internalRequest.request.setCanHaveDuplicates(); // shard has not been allocated yet, retry it here if (retryPrimaryException(e)) { logger.trace("had an error while performing operation on primary ({}), scheduling a retry.", e.getMessage()); + // We have to close here because when we retry we will increment get a new reference on index shard again and we do not want to + // increment twice. + Releasables.close(indexShardReference); + // We have to reset to null here because whe we retry it might be that we never get to the point where we assign a new reference + // (for example, in case the operation was rejected because queue is full). In this case we would release again once one of the finish methods is called. + indexShardReference = null; retry(e); return; } @@ -614,6 +628,12 @@ public abstract class TransportShardReplicationOperationAction listener) { + InternalRequest internalRequest, ActionListener listener, Releasable indexShardReference) { this.replicaRequest = replicaRequest; this.listener = listener; this.finalResponse = finalResponse; this.originalPrimaryShard = originalPrimaryShard; this.observer = observer; indexMetaData = observer.observedState().metaData().index(internalRequest.concreteIndex()); + this.indexShardReference = indexShardReference; ShardRouting shard; // we double check on the state, if it got changed we need to make sure we take the latest one cause @@ -742,17 +766,23 @@ public abstract class TransportShardReplicationOperationAction recoveredTypes = internalPerformTranslogRecovery(true); - assert recoveredTypes.isEmpty(); - assert recoveryState.getTranslog().recoveredOperations() == 0; + assert engineUnsafe() == null : "engine was already created"; + Map recoveredTypes = internalPerformTranslogRecovery(true); + assert recoveredTypes.isEmpty(); + assert recoveryState.getTranslog().recoveredOperations() == 0; } - /** called if recovery has to be restarted after network error / delay ** */ + /** + * called if recovery has to be restarted after network error / delay ** + */ public void performRecoveryRestart() throws IOException { synchronized (mutex) { if (state != IndexShardState.RECOVERING) { @@ -850,7 +863,9 @@ public class IndexShard extends AbstractIndexShardComponent { } } - /** returns stats about ongoing recoveries, both source and target */ + /** + * returns stats about ongoing recoveries, both source and target + */ public RecoveryStats recoveryStats() { return recoveryStats; } @@ -999,6 +1014,7 @@ public class IndexShard extends AbstractIndexShardComponent { } MetaDataStateFormat.deleteMetaState(shardPath().getDataPath()); } + public ShardPath shardPath() { return path; } @@ -1098,7 +1114,9 @@ public class IndexShard extends AbstractIndexShardComponent { }); } - /** Schedules another (future) refresh, if refresh_interval is still enabled. */ + /** + * Schedules another (future) refresh, if refresh_interval is still enabled. + */ private void reschedule() { synchronized (mutex) { if (state != IndexShardState.CLOSED && refreshInterval.millis() > 0) { @@ -1293,4 +1311,37 @@ public class IndexShard extends AbstractIndexShardComponent { threadPool, indexingService, indexSettingsService, warmer, store, deletionPolicy, translog, mergePolicyProvider, mergeScheduler, mapperAnalyzer, similarityService.similarity(), codecService, failedEngineListener, translogRecoveryPerformer, indexCache.filter(), indexCache.filterPolicy()); } + + private static class IndexShardOperationCounter extends AbstractRefCounted { + final private ESLogger logger; + private final ShardId shardId; + + public IndexShardOperationCounter(ESLogger logger, ShardId shardId) { + super("index-shard-operations-counter"); + this.logger = logger; + this.shardId = shardId; + } + + @Override + protected void closeInternal() { + logger.debug("operations counter reached 0, will not accept any further writes"); + } + + @Override + protected void alreadyClosed() { + throw new IndexShardClosedException(shardId, "could not increment operation counter. shard is closed."); + } + } + + public void incrementOperationCounter() { + indexShardOperationCounter.incRef(); + } + + public void decrementOperationCounter() { + indexShardOperationCounter.decRef(); + } + + public int getOperationsCount() { + return indexShardOperationCounter.refCount(); + } } diff --git a/src/test/java/org/elasticsearch/action/support/replication/ShardReplicationOperationTests.java b/src/test/java/org/elasticsearch/action/support/replication/ShardReplicationOperationTests.java index a28c78ccd34..7bd0cf373a5 100644 --- a/src/test/java/org/elasticsearch/action/support/replication/ShardReplicationOperationTests.java +++ b/src/test/java/org/elasticsearch/action/support/replication/ShardReplicationOperationTests.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.action.support.replication; +import com.google.common.base.Predicate; import org.apache.lucene.index.CorruptIndexException; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; @@ -41,9 +42,11 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.*; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.DummyTransportAddress; @@ -55,7 +58,9 @@ import org.elasticsearch.test.ElasticsearchTestCase; import org.elasticsearch.test.cluster.TestClusterService; import org.elasticsearch.test.transport.CapturingTransport; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportResponse; +import org.elasticsearch.transport.TransportResponseOptions; import org.elasticsearch.transport.TransportService; import org.junit.AfterClass; import org.junit.Before; @@ -66,6 +71,7 @@ import java.io.IOException; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -82,7 +88,10 @@ public class ShardReplicationOperationTests extends ElasticsearchTestCase { private TransportService transportService; private CapturingTransport transport; private Action action; - + /* * + * TransportShardReplicationOperationAction needs an instance of IndexShard to count operations. + * indexShards is reset to null before each test and will be initialized upon request in the tests. + */ @BeforeClass public static void beforeClass() { @@ -97,6 +106,7 @@ public class ShardReplicationOperationTests extends ElasticsearchTestCase { transportService = new TransportService(transport, threadPool); transportService.start(); action = new Action(ImmutableSettings.EMPTY, "testAction", transportService, clusterService, threadPool); + count.set(1); } @AfterClass @@ -105,7 +115,6 @@ public class ShardReplicationOperationTests extends ElasticsearchTestCase { threadPool = null; } - void assertListenerThrows(String msg, PlainActionFuture listener, Class klass) throws InterruptedException { try { listener.get(); @@ -113,7 +122,6 @@ public class ShardReplicationOperationTests extends ElasticsearchTestCase { } catch (ExecutionException ex) { assertThat(ex.getCause(), instanceOf(klass)); } - } @Test @@ -145,7 +153,12 @@ public class ShardReplicationOperationTests extends ElasticsearchTestCase { block = ClusterBlocks.builder() .addGlobalBlock(new ClusterBlock(1, "non retryable", false, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL)); clusterService.setState(ClusterState.builder(clusterService.state()).blocks(block)); - assertListenerThrows("primary phase should fail operation when moving from a retryable block a non-retryable one", listener, ClusterBlockException.class); + assertListenerThrows("primary phase should fail operation when moving from a retryable block to a non-retryable one", listener, ClusterBlockException.class); + assertIndexShardUninitialized(); + } + + public void assertIndexShardUninitialized() { + assertEquals(1, count.get()); } ClusterState stateWithStartedPrimary(String index, boolean primaryLocal, int numberOfReplicas) { @@ -163,7 +176,6 @@ public class ShardReplicationOperationTests extends ElasticsearchTestCase { replicaStates[i] = ShardRoutingState.UNASSIGNED; } return state(index, primaryLocal, randomFrom(ShardRoutingState.STARTED, ShardRoutingState.RELOCATING), replicaStates); - } ClusterState state(String index, boolean primaryLocal, ShardRoutingState primaryState, ShardRoutingState... replicaStates) { @@ -225,7 +237,6 @@ public class ShardReplicationOperationTests extends ElasticsearchTestCase { } indexShardRoutingBuilder.addShard( new ImmutableShardRouting(index, shardId.id(), replicaNode, relocatingNode, false, replicaState, 0)); - } ClusterState.Builder state = ClusterState.builder(new ClusterName("test")); @@ -268,6 +279,7 @@ public class ShardReplicationOperationTests extends ElasticsearchTestCase { listener.get(); assertTrue("request wasn't processed on primary, despite of it being assigned", request.processedOnPrimary.get()); + assertIndexShardCounter(1); } @Test @@ -290,17 +302,23 @@ public class ShardReplicationOperationTests extends ElasticsearchTestCase { if (primaryNodeId.equals(clusterService.localNode().id())) { logger.info("--> primary is assigned locally, testing for execution"); assertTrue("request failed to be processed on a local primary", request.processedOnPrimary.get()); + if (transport.capturedRequests().length > 0) { + assertIndexShardCounter(2); + } else { + assertIndexShardCounter(1); + } } else { logger.info("--> primary is assigned to [{}], checking request forwarded", primaryNodeId); final List capturedRequests = transport.capturedRequestsByTargetNode().get(primaryNodeId); assertThat(capturedRequests, notNullValue()); assertThat(capturedRequests.size(), equalTo(1)); assertThat(capturedRequests.get(0).action, equalTo("testAction")); + assertIndexShardUninitialized(); } } @Test - public void testWriteConsistency() { + public void testWriteConsistency() throws ExecutionException, InterruptedException { action = new ActionWithConsistency(ImmutableSettings.EMPTY, "testActionWithConsistency", transportService, clusterService, threadPool); final String index = "test"; final ShardId shardId = new ShardId(index, 0); @@ -348,17 +366,23 @@ public class ShardReplicationOperationTests extends ElasticsearchTestCase { assertThat(primaryPhase.checkWriteConsistency(shardRoutingTable.primaryShard()), nullValue()); primaryPhase.run(); assertTrue("operations should have been perform, consistency level is met", request.processedOnPrimary.get()); + if (assignedReplicas > 0) { + assertIndexShardCounter(2); + } else { + assertIndexShardCounter(1); + } } else { assertThat(primaryPhase.checkWriteConsistency(shardRoutingTable.primaryShard()), notNullValue()); primaryPhase.run(); assertFalse("operations should not have been perform, consistency level is *NOT* met", request.processedOnPrimary.get()); + assertIndexShardUninitialized(); for (int i = 0; i < replicaStates.length; i++) { replicaStates[i] = ShardRoutingState.STARTED; } clusterService.setState(state(index, true, ShardRoutingState.STARTED, replicaStates)); assertTrue("once the consistency level met, operation should continue", request.processedOnPrimary.get()); + assertIndexShardCounter(2); } - } @Test @@ -407,7 +431,6 @@ public class ShardReplicationOperationTests extends ElasticsearchTestCase { totalShards++; } } - runReplicateTest(shardRoutingTable, assignedReplicas, totalShards); } @@ -421,13 +444,14 @@ public class ShardReplicationOperationTests extends ElasticsearchTestCase { logger.debug("expecting [{}] assigned replicas, [{}] total shards. using state: \n{}", assignedReplicas, totalShards, clusterService.state().prettyPrint()); - final TransportShardReplicationOperationAction.InternalRequest internalRequest = action.new InternalRequest(request); internalRequest.concreteIndex(shardId.index().name()); + Releasable reference = getOrCreateIndexShardOperationsCounter(); + assertIndexShardCounter(2); TransportShardReplicationOperationAction.ReplicationPhase replicationPhase = action.new ReplicationPhase(shardIt, request, new Response(), new ClusterStateObserver(clusterService, logger), - primaryShard, internalRequest, listener); + primaryShard, internalRequest, listener, reference); assertThat(replicationPhase.totalShards(), equalTo(totalShards)); assertThat(replicationPhase.pending(), equalTo(assignedReplicas)); @@ -472,8 +496,158 @@ public class ShardReplicationOperationTests extends ElasticsearchTestCase { for (CapturingTransport.CapturedRequest capturedRequest : transport.capturedRequests()) { assertThat(capturedRequest.action, equalTo(ShardStateAction.SHARD_FAILED_ACTION_NAME)); } + // all replicas have responded so the counter should be decreased again + assertIndexShardCounter(1); } + @Test + public void testCounterOnPrimary() throws InterruptedException, ExecutionException, IOException { + final String index = "test"; + final ShardId shardId = new ShardId(index, 0); + // no replica, we only want to test on primary + clusterService.setState(state(index, true, + ShardRoutingState.STARTED)); + logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint()); + Request request = new Request(shardId).timeout("100ms"); + PlainActionFuture listener = new PlainActionFuture<>(); + + /** + * Execute an action that is stuck in shard operation until a latch is counted down. + * That way we can start the operation, check if the counter was incremented and then unblock the operation + * again to see if the counter is decremented afterwards. + * TODO: I could also write an action that asserts that the counter is 2 in the shard operation. + * However, this failure would only become apparent once listener.get is called. Seems a little implicit. + * */ + action = new ActionWithDelay(ImmutableSettings.EMPTY, "testActionWithExceptions", transportService, clusterService, threadPool); + final TransportShardReplicationOperationAction.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, listener); + Thread t = new Thread() { + public void run() { + primaryPhase.run(); + } + }; + t.start(); + // shard operation should be ongoing, so the counter is at 2 + // we have to wait here because increment happens in thread + awaitBusy(new Predicate() { + @Override + public boolean apply(@Nullable Object input) { + return (count.get() == 2); + } + }); + + assertIndexShardCounter(2); + assertThat(transport.capturedRequests().length, equalTo(0)); + ((ActionWithDelay) action).countDownLatch.countDown(); + t.join(); + listener.get(); + // operation finished, counter back to 0 + assertIndexShardCounter(1); + assertThat(transport.capturedRequests().length, equalTo(0)); + } + + @Test + public void testCounterIncrementedWhileReplicationOngoing() throws InterruptedException, ExecutionException, IOException { + final String index = "test"; + final ShardId shardId = new ShardId(index, 0); + // one replica to make sure replication is attempted + clusterService.setState(state(index, true, + ShardRoutingState.STARTED, ShardRoutingState.STARTED)); + logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint()); + Request request = new Request(shardId).timeout("100ms"); + PlainActionFuture listener = new PlainActionFuture<>(); + TransportShardReplicationOperationAction.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, listener); + primaryPhase.run(); + assertIndexShardCounter(2); + assertThat(transport.capturedRequests().length, equalTo(1)); + // try once with successful response + transport.handleResponse(transport.capturedRequests()[0].requestId, TransportResponse.Empty.INSTANCE); + assertIndexShardCounter(1); + transport.clear(); + request = new Request(shardId).timeout("100ms"); + primaryPhase = action.new PrimaryPhase(request, listener); + primaryPhase.run(); + assertIndexShardCounter(2); + assertThat(transport.capturedRequests().length, equalTo(1)); + // try with failure response + transport.handleResponse(transport.capturedRequests()[0].requestId, new CorruptIndexException("simulated", (String) null)); + assertIndexShardCounter(1); + } + + @Test + public void testReplicasCounter() throws Exception { + final ShardId shardId = new ShardId("test", 0); + clusterService.setState(state(shardId.index().getName(), true, + ShardRoutingState.STARTED, ShardRoutingState.STARTED)); + action = new ActionWithDelay(ImmutableSettings.EMPTY, "testActionWithExceptions", transportService, clusterService, threadPool); + final Action.ReplicaOperationTransportHandler replicaOperationTransportHandler = action.new ReplicaOperationTransportHandler(); + Thread t = new Thread() { + public void run() { + try { + replicaOperationTransportHandler.messageReceived(new Request(), createTransportChannel()); + } catch (Exception e) { + } + } + }; + t.start(); + // shard operation should be ongoing, so the counter is at 2 + // we have to wait here because increment happens in thread + awaitBusy(new Predicate() { + @Override + public boolean apply(@Nullable Object input) { + return count.get() == 2; + } + }); + ((ActionWithDelay) action).countDownLatch.countDown(); + t.join(); + // operation should have finished and counter decreased because no outstanding replica requests + assertIndexShardCounter(1); + // now check if this also works if operation throws exception + action = new ActionWithExceptions(ImmutableSettings.EMPTY, "testActionWithExceptions", transportService, clusterService, threadPool); + final Action.ReplicaOperationTransportHandler replicaOperationTransportHandlerForException = action.new ReplicaOperationTransportHandler(); + try { + replicaOperationTransportHandlerForException.messageReceived(new Request(shardId), createTransportChannel()); + fail(); + } catch (Throwable t2) { + } + assertIndexShardCounter(1); + } + + @Test + public void testCounterDecrementedIfShardOperationThrowsException() throws InterruptedException, ExecutionException, IOException { + action = new ActionWithExceptions(ImmutableSettings.EMPTY, "testActionWithExceptions", transportService, clusterService, threadPool); + final String index = "test"; + final ShardId shardId = new ShardId(index, 0); + clusterService.setState(state(index, true, + ShardRoutingState.STARTED, ShardRoutingState.STARTED)); + logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint()); + Request request = new Request(shardId).timeout("100ms"); + PlainActionFuture listener = new PlainActionFuture<>(); + TransportShardReplicationOperationAction.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, listener); + primaryPhase.run(); + // no replica request should have been sent yet + assertThat(transport.capturedRequests().length, equalTo(0)); + // no matter if the operation is retried or not, counter must be be back to 1 + assertIndexShardCounter(1); + } + + private void assertIndexShardCounter(int expected) { + assertThat(count.get(), equalTo(expected)); + } + + private final AtomicInteger count = new AtomicInteger(0); + + /* + * Returns testIndexShardOperationsCounter or initializes it if it was already created in this test run. + * */ + private synchronized Releasable getOrCreateIndexShardOperationsCounter() { + count.incrementAndGet(); + return new Releasable() { + @Override + public void close() { + count.decrementAndGet(); + } + }; + } static class Request extends ShardReplicationOperationRequest { int shardId; @@ -481,7 +655,7 @@ public class ShardReplicationOperationTests extends ElasticsearchTestCase { public AtomicInteger processedOnReplicas = new AtomicInteger(); Request() { - this.operationThreaded(false); + this.operationThreaded(randomBoolean()); } Request(ShardId shardId) { @@ -505,10 +679,9 @@ public class ShardReplicationOperationTests extends ElasticsearchTestCase { } static class Response extends ActionWriteResponse { - } - static class Action extends TransportShardReplicationOperationAction { + class Action extends TransportShardReplicationOperationAction { Action(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, @@ -549,9 +722,14 @@ public class ShardReplicationOperationTests extends ElasticsearchTestCase { protected boolean resolveIndex() { return false; } + + @Override + protected Releasable getIndexShardOperationsCounter(ShardId shardId) { + return getOrCreateIndexShardOperationsCounter(); + } } - static class ActionWithConsistency extends Action { + class ActionWithConsistency extends Action { ActionWithConsistency(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, ThreadPool threadPool) { super(settings, actionName, transportService, clusterService, threadPool); @@ -567,5 +745,97 @@ public class ShardReplicationOperationTests extends ElasticsearchTestCase { return new DiscoveryNode("node_" + nodeId, DummyTransportAddress.INSTANCE, Version.CURRENT); } + /* + * Throws exceptions when executed. Used for testing if the counter is correctly decremented in case an operation fails. + * */ + class ActionWithExceptions extends Action { + + ActionWithExceptions(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, ThreadPool threadPool) throws IOException { + super(settings, actionName, transportService, clusterService, threadPool); + } + + @Override + protected Tuple shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) throws Throwable { + return throwException(shardRequest.shardId); + } + + private Tuple throwException(ShardId shardId) { + try { + if (randomBoolean()) { + // throw a generic exception + // for testing on replica this will actually cause an NPE because it will make the shard fail but + // for this we need an IndicesService which is null. + throw new ElasticsearchException("simulated"); + } else { + // throw an exception which will cause retry on primary and be ignored on replica + throw new IndexShardNotStartedException(shardId, IndexShardState.RECOVERING); + } + } catch (Exception e) { + logger.info("throwing ", e); + throw e; + } + } + + @Override + protected void shardOperationOnReplica(ShardId shardId, Request shardRequest) { + throwException(shardRequest.internalShardId); + } + } + + /** + * Delays the operation until countDownLatch is counted down + */ + class ActionWithDelay extends Action { + CountDownLatch countDownLatch = new CountDownLatch(1); + + ActionWithDelay(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, ThreadPool threadPool) throws IOException { + super(settings, actionName, transportService, clusterService, threadPool); + } + + @Override + protected Tuple shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) throws Throwable { + awaitLatch(); + return new Tuple<>(new Response(), shardRequest.request); + } + + private void awaitLatch() throws InterruptedException { + countDownLatch.await(); + countDownLatch = new CountDownLatch(1); + } + + @Override + protected void shardOperationOnReplica(ShardId shardId, Request shardRequest) { + try { + awaitLatch(); + } catch (InterruptedException e) { + } + } + + } + + /* + * Transport channel that is needed for replica operation testing. + * */ + public TransportChannel createTransportChannel() { + return new TransportChannel() { + + @Override + public String action() { + return null; + } + + @Override + public void sendResponse(TransportResponse response) throws IOException { + } + + @Override + public void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException { + } + + @Override + public void sendResponse(Throwable error) throws IOException { + } + }; + } } diff --git a/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index e17721a0f00..8464cf8e42a 100644 --- a/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -23,18 +23,23 @@ import org.elasticsearch.cluster.routing.MutableShardRouting; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.IndexService; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.test.ElasticsearchSingleNodeTest; +import org.junit.Test; +import java.io.Closeable; import java.io.IOException; import java.nio.file.Path; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.ExecutionException; import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; /** * Simple unit-test IndexShard related operations. @@ -95,20 +100,20 @@ public class IndexShardTests extends ElasticsearchSingleNodeTest { IndexShard shard = test.shard(0); ShardStateMetaData shardStateMetaData = load(logger, env.availableShardPaths(shard.shardId)); assertEquals(getShardStateMetadata(shard), shardStateMetaData); - ShardRouting routing = new MutableShardRouting(shard.shardRouting, shard.shardRouting.version()+1); + ShardRouting routing = new MutableShardRouting(shard.shardRouting, shard.shardRouting.version() + 1); shard.updateRoutingEntry(routing, true); shardStateMetaData = load(logger, env.availableShardPaths(shard.shardId)); assertEquals(shardStateMetaData, getShardStateMetadata(shard)); assertEquals(shardStateMetaData, new ShardStateMetaData(routing.version(), routing.primary(), shard.indexSettings.get(IndexMetaData.SETTING_UUID))); - routing = new MutableShardRouting(shard.shardRouting, shard.shardRouting.version()+1); + routing = new MutableShardRouting(shard.shardRouting, shard.shardRouting.version() + 1); shard.updateRoutingEntry(routing, true); shardStateMetaData = load(logger, env.availableShardPaths(shard.shardId)); assertEquals(shardStateMetaData, getShardStateMetadata(shard)); assertEquals(shardStateMetaData, new ShardStateMetaData(routing.version(), routing.primary(), shard.indexSettings.get(IndexMetaData.SETTING_UUID))); - routing = new MutableShardRouting(shard.shardRouting, shard.shardRouting.version()+1); + routing = new MutableShardRouting(shard.shardRouting, shard.shardRouting.version() + 1); shard.updateRoutingEntry(routing, true); shardStateMetaData = load(logger, env.availableShardPaths(shard.shardId)); assertEquals(shardStateMetaData, getShardStateMetadata(shard)); @@ -122,13 +127,13 @@ public class IndexShardTests extends ElasticsearchSingleNodeTest { assertEquals("inactive shard state shouldn't be persisted", shardStateMetaData, new ShardStateMetaData(routing.version(), routing.primary(), shard.indexSettings.get(IndexMetaData.SETTING_UUID))); - shard.updateRoutingEntry(new MutableShardRouting(shard.shardRouting, shard.shardRouting.version()+1), false); + shard.updateRoutingEntry(new MutableShardRouting(shard.shardRouting, shard.shardRouting.version() + 1), false); shardStateMetaData = load(logger, env.availableShardPaths(shard.shardId)); assertFalse("shard state persisted despite of persist=false", shardStateMetaData.equals(getShardStateMetadata(shard))); assertEquals("shard state persisted despite of persist=false", shardStateMetaData, new ShardStateMetaData(routing.version(), routing.primary(), shard.indexSettings.get(IndexMetaData.SETTING_UUID))); - routing = new MutableShardRouting(shard.shardRouting, shard.shardRouting.version()+1); + routing = new MutableShardRouting(shard.shardRouting, shard.shardRouting.version() + 1); shard.updateRoutingEntry(routing, true); shardStateMetaData = load(logger, env.availableShardPaths(shard.shardId)); assertEquals(shardStateMetaData, getShardStateMetadata(shard)); @@ -153,15 +158,13 @@ public class IndexShardTests extends ElasticsearchSingleNodeTest { ShardStateMetaData shardStateMetaData = load(logger, env.availableShardPaths(shard.shardId)); assertEquals(shardStateMetaData, getShardStateMetadata(shard)); - routing = new MutableShardRouting(shard.shardId.index().getName(), shard.shardId.id(), routing.currentNodeId(), routing.primary(), ShardRoutingState.INITIALIZING, shard.shardRouting.version()+1); + routing = new MutableShardRouting(shard.shardId.index().getName(), shard.shardId.id(), routing.currentNodeId(), routing.primary(), ShardRoutingState.INITIALIZING, shard.shardRouting.version() + 1); shard.updateRoutingEntry(routing, true); shard.deleteShardState(); assertNull("no shard state expected after delete on initializing", load(logger, env.availableShardPaths(shard.shardId))); - - } ShardStateMetaData getShardStateMetadata(IndexShard shard) { @@ -180,7 +183,7 @@ public class IndexShardTests extends ElasticsearchSingleNodeTest { assertEquals(meta.hashCode(), new ShardStateMetaData(meta.version, meta.primary, meta.indexUUID).hashCode()); assertFalse(meta.equals(new ShardStateMetaData(meta.version, !meta.primary, meta.indexUUID))); - assertFalse(meta.equals(new ShardStateMetaData(meta.version+1, meta.primary, meta.indexUUID))); + assertFalse(meta.equals(new ShardStateMetaData(meta.version + 1, meta.primary, meta.indexUUID))); assertFalse(meta.equals(new ShardStateMetaData(meta.version, !meta.primary, meta.indexUUID + "foo"))); Set hashCodes = new HashSet<>(); for (int i = 0; i < 30; i++) { // just a sanity check that we impl hashcode @@ -191,6 +194,41 @@ public class IndexShardTests extends ElasticsearchSingleNodeTest { } + @Test + public void testDeleteIndexDecreasesCounter() throws InterruptedException, ExecutionException, IOException { + assertAcked(client().admin().indices().prepareCreate("test").setSettings(ImmutableSettings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0)).get()); + ensureGreen("test"); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexService indexService = indicesService.indexServiceSafe("test"); + IndexShard indexShard = indexService.shard(0); + client().admin().indices().prepareDelete("test").get(); + assertThat(indexShard.getOperationsCount(), equalTo(0)); + try { + indexShard.incrementOperationCounter(); + fail("we should not be able to increment anymore"); + } catch (IndexShardClosedException e) { + // expected + } + } + + @Test + public void testIndexShardCounter() throws InterruptedException, ExecutionException, IOException { + assertAcked(client().admin().indices().prepareCreate("test").setSettings(ImmutableSettings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0)).get()); + ensureGreen("test"); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexService indexService = indicesService.indexServiceSafe("test"); + IndexShard indexShard = indexService.shard(0); + indexShard.incrementOperationCounter(); + assertEquals(2, indexShard.getOperationsCount()); + indexShard.incrementOperationCounter(); + assertEquals(3, indexShard.getOperationsCount()); + indexShard.decrementOperationCounter(); + indexShard.decrementOperationCounter(); + assertEquals(1, indexShard.getOperationsCount()); + + + } + public static ShardStateMetaData load(ESLogger logger, Path... shardPaths) throws IOException { return ShardStateMetaData.FORMAT.loadLatestState(logger, shardPaths); } diff --git a/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java b/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java index 109bd030023..10ad832eef2 100644 --- a/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java +++ b/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java @@ -648,6 +648,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase } ensureClusterSizeConsistency(); ensureClusterStateConsistency(); + cluster().beforeIndexDeletion(); cluster().wipe(); // wipe after to make sure we fail in the test that didn't ack the delete if (afterClass || currentClusterScope == Scope.TEST) { cluster().close(); diff --git a/src/test/java/org/elasticsearch/test/InternalTestCluster.java b/src/test/java/org/elasticsearch/test/InternalTestCluster.java index 194d54968b2..639b30ede4f 100644 --- a/src/test/java/org/elasticsearch/test/InternalTestCluster.java +++ b/src/test/java/org/elasticsearch/test/InternalTestCluster.java @@ -26,16 +26,10 @@ import com.carrotsearch.randomizedtesting.generators.RandomPicks; import com.carrotsearch.randomizedtesting.generators.RandomStrings; import com.google.common.base.Predicate; import com.google.common.base.Predicates; -import com.google.common.collect.Collections2; -import com.google.common.collect.Iterables; -import com.google.common.collect.Iterators; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; +import com.google.common.collect.*; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; - import org.apache.lucene.util.IOUtils; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; @@ -80,7 +74,9 @@ import org.elasticsearch.index.cache.filter.FilterCacheModule; import org.elasticsearch.index.cache.filter.FilterCacheModule.FilterCacheSettings; import org.elasticsearch.index.cache.filter.index.IndexFilterCache; import org.elasticsearch.index.cache.filter.none.NoneFilterCache; +import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardModule; +import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.IndexStoreModule; import org.elasticsearch.indices.IndicesService; @@ -112,34 +108,19 @@ import java.io.Closeable; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.file.Path; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.NavigableMap; -import java.util.Random; -import java.util.Set; -import java.util.TreeMap; +import java.util.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import static junit.framework.Assert.fail; -import static org.apache.lucene.util.LuceneTestCase.TEST_NIGHTLY; -import static org.apache.lucene.util.LuceneTestCase.rarely; -import static org.apache.lucene.util.LuceneTestCase.usually; +import static org.apache.lucene.util.LuceneTestCase.*; import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; import static org.elasticsearch.node.NodeBuilder.nodeBuilder; import static org.elasticsearch.test.ElasticsearchTestCase.assertBusy; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoTimeout; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.*; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; @@ -855,6 +836,7 @@ public final class InternalTestCluster extends TestCluster { } public static final String TRANSPORT_CLIENT_PREFIX = "transport_client_"; + static class TransportClientFactory { private final boolean sniff; private final Settings settings; @@ -976,6 +958,26 @@ public final class InternalTestCluster extends TestCluster { randomlyResetClients(); /* reset all clients - each test gets its own client based on the Random instance created above. */ } + @Override + public void beforeIndexDeletion() { + assertShardIndexCounter(); + } + + private void assertShardIndexCounter() { + final Collection nodesAndClients = nodes.values(); + for (NodeAndClient nodeAndClient : nodesAndClients) { + IndicesService indexServices = getInstance(IndicesService.class, nodeAndClient.name); + for (IndexService indexService : indexServices) { + for (IndexShard indexShard : indexService) { + assertThat(indexShard.getOperationsCount(), anyOf(equalTo(1), equalTo(0))); + if (indexShard.getOperationsCount() == 0) { + assertThat(indexShard.state(), equalTo(IndexShardState.CLOSED)); + } + } + } + } + } + private void randomlyResetClients() throws IOException { // only reset the clients on nightly tests, it causes heavy load... if (RandomizedTest.isNightly() && rarely(random)) { diff --git a/src/test/java/org/elasticsearch/test/TestCluster.java b/src/test/java/org/elasticsearch/test/TestCluster.java index c8d48521b14..a1f5f016a8d 100644 --- a/src/test/java/org/elasticsearch/test/TestCluster.java +++ b/src/test/java/org/elasticsearch/test/TestCluster.java @@ -76,6 +76,12 @@ public abstract class TestCluster implements Iterable, Closeable { wipeRepositories(); } + /** + * Assertions that should run before the cluster is wiped should be called in this method + */ + public void beforeIndexDeletion() { + } + /** * This method checks all the things that need to be checked after each test */