ref count write operations on IndexShard

This commit adds a counter for IndexShard that keeps track of how many write operations
are currently in flight on a shard. The counter is incremented whenever a write request is
submitted in TransportShardReplicationOperationAction and decremented when it is finished.
On a primary it stays incremented while replicas are being processed.
The counter is an instance of AbstractRefCounted. Once this counter reaches 0
each write operation will be rejected with an IndexClosedException.

closes #10610
This commit is contained in:
Britta Weber 2015-04-24 16:23:51 +02:00
parent ba68d354c4
commit 7bf83ff924
8 changed files with 516 additions and 90 deletions

View File

@ -19,7 +19,7 @@
package org.elasticsearch.action.support.replication;
import org.elasticsearch.ElasticsearchException;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionWriteResponse;
@ -36,17 +36,16 @@ import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.RefCounted;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.DocumentAlreadyExistsException;
import org.elasticsearch.index.engine.VersionConflictEngineException;
@ -57,21 +56,15 @@ import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BaseTransportResponseHandler;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.EmptyTransportResponseHandler;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.*;
import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
/**
*/
@ -103,7 +96,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
this.checkWriteConsistency = checkWriteConsistency();
transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, new OperationTransportHandler());
// we must never reject on because of thread pool capacity on replicas
// we must never reject on because of thread pool capacity on replicas
transportService.registerRequestHandler(transportReplicaAction, replicaRequest, executor, true, new ReplicaOperationTransportHandler());
this.transportOptions = transportOptions();
@ -283,7 +276,12 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
@Override
protected void doRun() throws Exception {
shardOperationOnReplica(request.internalShardId, request);
try (Releasable shardReference = getIndexShardOperationsCounter(request.internalShardId)) {
shardOperationOnReplica(request.internalShardId, request);
} catch (Throwable t) {
failReplicaIfNeeded(request.internalShardId.index().name(), request.internalShardId.id(), t);
throw t;
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}
@ -309,6 +307,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
private final InternalRequest internalRequest;
private final ClusterStateObserver observer;
private final AtomicBoolean finished = new AtomicBoolean(false);
private volatile Releasable indexShardReference;
PrimaryPhase(Request request, ActionListener<Response> listener) {
@ -394,7 +393,9 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
return null;
}
/** send the request to the node holding the primary or execute if local */
/**
* send the request to the node holding the primary or execute if local
*/
protected void routeRequestOrPerformLocally(final ShardRouting primary, final ShardIterator shardsIt) {
if (primary.currentNodeId().equals(observer.observedState().nodes().localNodeId())) {
try {
@ -487,7 +488,9 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
});
}
/** upon success, finish the first phase and transfer responsibility to the {@link ReplicationPhase} */
/**
* upon success, finish the first phase and transfer responsibility to the {@link ReplicationPhase}
*/
void finishAndMoveToReplication(ReplicationPhase replicationPhase) {
if (finished.compareAndSet(false, true)) {
replicationPhase.run();
@ -499,6 +502,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
void finishAsFailed(Throwable failure) {
if (finished.compareAndSet(false, true)) {
Releasables.close(indexShardReference);
logger.trace("operation failed", failure);
listener.onFailure(failure);
} else {
@ -509,6 +513,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
void finishWithUnexpectedFailure(Throwable failure) {
logger.warn("unexpected error during the primary phase for action [{}]", failure, actionName);
if (finished.compareAndSet(false, true)) {
Releasables.close(indexShardReference);
listener.onFailure(failure);
} else {
assert false : "finishWithUnexpectedFailure called but operation is already finished";
@ -524,7 +529,9 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
}
}
/** perform the operation on the node holding the primary */
/**
* perform the operation on the node holding the primary
*/
void performOnPrimary(final ShardRouting primary, final ShardIterator shardsIt) {
final String writeConsistencyFailure = checkWriteConsistency(primary);
if (writeConsistencyFailure != null) {
@ -533,15 +540,22 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
}
final ReplicationPhase replicationPhase;
try {
indexShardReference = getIndexShardOperationsCounter(primary.shardId());
PrimaryOperationRequest por = new PrimaryOperationRequest(primary.id(), internalRequest.concreteIndex(), internalRequest.request());
Tuple<Response, ReplicaRequest> primaryResponse = shardOperationOnPrimary(observer.observedState(), por);
logger.trace("operation completed on primary [{}]", primary);
replicationPhase = new ReplicationPhase(shardsIt, primaryResponse.v2(), primaryResponse.v1(), observer, primary, internalRequest, listener);
replicationPhase = new ReplicationPhase(shardsIt, primaryResponse.v2(), primaryResponse.v1(), observer, primary, internalRequest, listener, indexShardReference);
} catch (Throwable e) {
internalRequest.request.setCanHaveDuplicates();
// shard has not been allocated yet, retry it here
if (retryPrimaryException(e)) {
logger.trace("had an error while performing operation on primary ({}), scheduling a retry.", e.getMessage());
// We have to close here because when we retry we will increment get a new reference on index shard again and we do not want to
// increment twice.
Releasables.close(indexShardReference);
// We have to reset to null here because whe we retry it might be that we never get to the point where we assign a new reference
// (for example, in case the operation was rejected because queue is full). In this case we would release again once one of the finish methods is called.
indexShardReference = null;
retry(e);
return;
}
@ -614,6 +628,12 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
}
protected Releasable getIndexShardOperationsCounter(ShardId shardId) {
IndexService indexService = indicesService.indexServiceSafe(shardId.index().getName());
IndexShard indexShard = indexService.shardSafe(shardId.id());
return new IndexShardReference(indexShard);
}
private void failReplicaIfNeeded(String index, int shardId, Throwable t) {
logger.trace("failure on replica [{}][{}]", t, index, shardId);
if (ignoreReplicaException(t) == false) {
@ -631,7 +651,9 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
}
}
/** inner class is responsible for send the requests to all replica shards and manage the responses */
/**
* inner class is responsible for send the requests to all replica shards and manage the responses
*/
final class ReplicationPhase extends AbstractRunnable {
private final ReplicaRequest replicaRequest;
@ -646,6 +668,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
private final AtomicInteger pending;
private final int totalShards;
private final ClusterStateObserver observer;
private final Releasable indexShardReference;
/**
* the constructor doesn't take any action, just calculates state. Call {@link #run()} to start
@ -653,13 +676,14 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
*/
public ReplicationPhase(ShardIterator originalShardIt, ReplicaRequest replicaRequest, Response finalResponse,
ClusterStateObserver observer, ShardRouting originalPrimaryShard,
InternalRequest internalRequest, ActionListener<Response> listener) {
InternalRequest internalRequest, ActionListener<Response> listener, Releasable indexShardReference) {
this.replicaRequest = replicaRequest;
this.listener = listener;
this.finalResponse = finalResponse;
this.originalPrimaryShard = originalPrimaryShard;
this.observer = observer;
indexMetaData = observer.observedState().metaData().index(internalRequest.concreteIndex());
this.indexShardReference = indexShardReference;
ShardRouting shard;
// we double check on the state, if it got changed we need to make sure we take the latest one cause
@ -742,17 +766,23 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
this.pending = new AtomicInteger(numberOfPendingShardInstances);
}
/** total shard copies */
/**
* total shard copies
*/
int totalShards() {
return totalShards;
}
/** total successful operations so far */
/**
* total successful operations so far
*/
int successful() {
return success.get();
}
/** number of pending operations */
/**
* number of pending operations
*/
int pending() {
return pending.get();
}
@ -763,7 +793,9 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
forceFinishAsFailed(t);
}
/** start sending current requests to replicas */
/**
* start sending current requests to replicas
*/
@Override
protected void doRun() {
if (pending.get() == 0) {
@ -798,7 +830,9 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
}
}
/** send operation to the given node or perform it if local */
/**
* send operation to the given node or perform it if local
*/
void performOnReplica(final ShardRouting shard, final String nodeId) {
// if we don't have that node, it means that it might have failed and will be created again, in
// this case, we don't have to do the operation, and just let it failover
@ -894,12 +928,14 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
private void forceFinishAsFailed(Throwable t) {
if (finished.compareAndSet(false, true)) {
Releasables.close(indexShardReference);
listener.onFailure(t);
}
}
private void doFinish() {
if (finished.compareAndSet(false, true)) {
Releasables.close(indexShardReference);
final ShardId shardId = shardIt.shardId();
final ActionWriteResponse.ShardInfo.Failure[] failuresArray;
if (!shardReplicaFailures.isEmpty()) {
@ -950,4 +986,22 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
return concreteIndex;
}
}
static class IndexShardReference implements Releasable {
final private IndexShard counter;
private final AtomicBoolean closed = new AtomicBoolean(false);
IndexShardReference(IndexShard counter) {
counter.incrementOperationCounter();
this.counter = counter;
}
@Override
public void close() {
if (closed.compareAndSet(false, true)) {
counter.decrementOperationCounter();
}
}
}
}

View File

@ -38,7 +38,7 @@ public abstract class AbstractRefCounted implements RefCounted {
@Override
public final void incRef() {
if (tryIncRef() == false) {
throw new AlreadyClosedException(name + " is already closed can't increment refCount current count [" + refCount.get() + "]");
alreadyClosed();
}
}
@ -66,6 +66,10 @@ public abstract class AbstractRefCounted implements RefCounted {
}
protected void alreadyClosed() {
throw new AlreadyClosedException(name + " is already closed can't increment refCount current count [" + refCount.get() + "]");
}
/**
* Returns the current reference count.
*/

View File

@ -48,13 +48,16 @@ import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.search.Queries;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.common.util.concurrent.RefCounted;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.gateway.MetaDataStateFormat;
import org.elasticsearch.index.IndexService;
@ -204,13 +207,17 @@ public class IndexShard extends AbstractIndexShardComponent {
public static final String INDEX_FLUSH_ON_CLOSE = "index.flush_on_close";
private final ShardPath path;
private final IndexShardOperationCounter indexShardOperationCounter;
@Inject
public IndexShard(ShardId shardId, IndexSettingsService indexSettingsService, IndicesLifecycle indicesLifecycle, Store store, MergeSchedulerProvider mergeScheduler, Translog translog,
ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache, IndexAliasesService indexAliasesService, ShardIndexingService indexingService, ShardGetService getService, ShardSearchService searchService, ShardIndexWarmerService shardWarmerService,
ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache, IndexAliasesService indexAliasesService,
ShardIndexingService indexingService, ShardGetService getService, ShardSearchService searchService, ShardIndexWarmerService shardWarmerService,
ShardFilterCache shardFilterCache, ShardFieldData shardFieldData, PercolatorQueriesRegistry percolatorQueriesRegistry, ShardPercolateService shardPercolateService, CodecService codecService,
ShardTermVectorsService termVectorsService, IndexFieldDataService indexFieldDataService, IndexService indexService, ShardSuggestService shardSuggestService, ShardQueryCache shardQueryCache, ShardBitsetFilterCache shardBitsetFilterCache,
ShardTermVectorsService termVectorsService, IndexFieldDataService indexFieldDataService, IndexService indexService, ShardSuggestService shardSuggestService,
ShardQueryCache shardQueryCache, ShardBitsetFilterCache shardBitsetFilterCache,
@Nullable IndicesWarmer warmer, SnapshotDeletionPolicy deletionPolicy, SimilarityService similarityService, MergePolicyProvider mergePolicyProvider, EngineFactory factory,
ClusterService clusterService, NodeEnvironment nodeEnv, ShardPath path) {
ClusterService clusterService, NodeEnvironment nodeEnv, ShardPath path) {
super(shardId, indexSettingsService.getSettings());
this.codecService = codecService;
this.warmer = warmer;
@ -260,6 +267,7 @@ public class IndexShard extends AbstractIndexShardComponent {
this.checkIndexOnStartup = indexSettings.get("index.shard.check_on_startup", "false");
this.engineConfig = newEngineConfig();
this.indexShardOperationCounter = new IndexShardOperationCounter(logger, shardId);
}
public Store store() {
@ -703,7 +711,7 @@ public class IndexShard extends AbstractIndexShardComponent {
logger.trace("optimize with {}", optimize);
}
engine().forceMerge(optimize.flush(), optimize.maxNumSegments(), optimize.onlyExpungeDeletes(),
optimize.upgrade(), optimize.upgradeOnlyAncientSegments());
optimize.upgrade(), optimize.upgradeOnlyAncientSegments());
}
public SnapshotIndexCommit snapshotIndex() throws EngineException {
@ -746,6 +754,7 @@ public class IndexShard extends AbstractIndexShardComponent {
mergeScheduleFuture = null;
}
changeState(IndexShardState.CLOSED, reason);
indexShardOperationCounter.decRef();
} finally {
final Engine engine = this.currentEngineReference.getAndSet(null);
try {
@ -777,7 +786,9 @@ public class IndexShard extends AbstractIndexShardComponent {
return this;
}
/** called before starting to copy index files over */
/**
* called before starting to copy index files over
*/
public void prepareForIndexRecovery() {
if (state != IndexShardState.RECOVERING) {
throw new IndexShardNotRecoveringException(shardId, state);
@ -832,13 +843,15 @@ public class IndexShard extends AbstractIndexShardComponent {
* a remote peer.
*/
public void skipTranslogRecovery() {
assert engineUnsafe() == null : "engine was already created";
Map<String, Mapping> recoveredTypes = internalPerformTranslogRecovery(true);
assert recoveredTypes.isEmpty();
assert recoveryState.getTranslog().recoveredOperations() == 0;
assert engineUnsafe() == null : "engine was already created";
Map<String, Mapping> recoveredTypes = internalPerformTranslogRecovery(true);
assert recoveredTypes.isEmpty();
assert recoveryState.getTranslog().recoveredOperations() == 0;
}
/** called if recovery has to be restarted after network error / delay ** */
/**
* called if recovery has to be restarted after network error / delay **
*/
public void performRecoveryRestart() throws IOException {
synchronized (mutex) {
if (state != IndexShardState.RECOVERING) {
@ -850,7 +863,9 @@ public class IndexShard extends AbstractIndexShardComponent {
}
}
/** returns stats about ongoing recoveries, both source and target */
/**
* returns stats about ongoing recoveries, both source and target
*/
public RecoveryStats recoveryStats() {
return recoveryStats;
}
@ -999,6 +1014,7 @@ public class IndexShard extends AbstractIndexShardComponent {
}
MetaDataStateFormat.deleteMetaState(shardPath().getDataPath());
}
public ShardPath shardPath() {
return path;
}
@ -1098,7 +1114,9 @@ public class IndexShard extends AbstractIndexShardComponent {
});
}
/** Schedules another (future) refresh, if refresh_interval is still enabled. */
/**
* Schedules another (future) refresh, if refresh_interval is still enabled.
*/
private void reschedule() {
synchronized (mutex) {
if (state != IndexShardState.CLOSED && refreshInterval.millis() > 0) {
@ -1293,4 +1311,37 @@ public class IndexShard extends AbstractIndexShardComponent {
threadPool, indexingService, indexSettingsService, warmer, store, deletionPolicy, translog, mergePolicyProvider, mergeScheduler,
mapperAnalyzer, similarityService.similarity(), codecService, failedEngineListener, translogRecoveryPerformer, indexCache.filter(), indexCache.filterPolicy());
}
private static class IndexShardOperationCounter extends AbstractRefCounted {
final private ESLogger logger;
private final ShardId shardId;
public IndexShardOperationCounter(ESLogger logger, ShardId shardId) {
super("index-shard-operations-counter");
this.logger = logger;
this.shardId = shardId;
}
@Override
protected void closeInternal() {
logger.debug("operations counter reached 0, will not accept any further writes");
}
@Override
protected void alreadyClosed() {
throw new IndexShardClosedException(shardId, "could not increment operation counter. shard is closed.");
}
}
public void incrementOperationCounter() {
indexShardOperationCounter.incRef();
}
public void decrementOperationCounter() {
indexShardOperationCounter.decRef();
}
public int getOperationsCount() {
return indexShardOperationCounter.refCount();
}
}

View File

@ -18,6 +18,7 @@
*/
package org.elasticsearch.action.support.replication;
import com.google.common.base.Predicate;
import org.apache.lucene.index.CorruptIndexException;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
@ -41,9 +42,11 @@ import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.DummyTransportAddress;
@ -55,7 +58,9 @@ import org.elasticsearch.test.ElasticsearchTestCase;
import org.elasticsearch.test.cluster.TestClusterService;
import org.elasticsearch.test.transport.CapturingTransport;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponseOptions;
import org.elasticsearch.transport.TransportService;
import org.junit.AfterClass;
import org.junit.Before;
@ -66,6 +71,7 @@ import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -82,7 +88,10 @@ public class ShardReplicationOperationTests extends ElasticsearchTestCase {
private TransportService transportService;
private CapturingTransport transport;
private Action action;
/* *
* TransportShardReplicationOperationAction needs an instance of IndexShard to count operations.
* indexShards is reset to null before each test and will be initialized upon request in the tests.
*/
@BeforeClass
public static void beforeClass() {
@ -97,6 +106,7 @@ public class ShardReplicationOperationTests extends ElasticsearchTestCase {
transportService = new TransportService(transport, threadPool);
transportService.start();
action = new Action(ImmutableSettings.EMPTY, "testAction", transportService, clusterService, threadPool);
count.set(1);
}
@AfterClass
@ -105,7 +115,6 @@ public class ShardReplicationOperationTests extends ElasticsearchTestCase {
threadPool = null;
}
<T> void assertListenerThrows(String msg, PlainActionFuture<T> listener, Class<?> klass) throws InterruptedException {
try {
listener.get();
@ -113,7 +122,6 @@ public class ShardReplicationOperationTests extends ElasticsearchTestCase {
} catch (ExecutionException ex) {
assertThat(ex.getCause(), instanceOf(klass));
}
}
@Test
@ -145,7 +153,12 @@ public class ShardReplicationOperationTests extends ElasticsearchTestCase {
block = ClusterBlocks.builder()
.addGlobalBlock(new ClusterBlock(1, "non retryable", false, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL));
clusterService.setState(ClusterState.builder(clusterService.state()).blocks(block));
assertListenerThrows("primary phase should fail operation when moving from a retryable block a non-retryable one", listener, ClusterBlockException.class);
assertListenerThrows("primary phase should fail operation when moving from a retryable block to a non-retryable one", listener, ClusterBlockException.class);
assertIndexShardUninitialized();
}
public void assertIndexShardUninitialized() {
assertEquals(1, count.get());
}
ClusterState stateWithStartedPrimary(String index, boolean primaryLocal, int numberOfReplicas) {
@ -163,7 +176,6 @@ public class ShardReplicationOperationTests extends ElasticsearchTestCase {
replicaStates[i] = ShardRoutingState.UNASSIGNED;
}
return state(index, primaryLocal, randomFrom(ShardRoutingState.STARTED, ShardRoutingState.RELOCATING), replicaStates);
}
ClusterState state(String index, boolean primaryLocal, ShardRoutingState primaryState, ShardRoutingState... replicaStates) {
@ -225,7 +237,6 @@ public class ShardReplicationOperationTests extends ElasticsearchTestCase {
}
indexShardRoutingBuilder.addShard(
new ImmutableShardRouting(index, shardId.id(), replicaNode, relocatingNode, false, replicaState, 0));
}
ClusterState.Builder state = ClusterState.builder(new ClusterName("test"));
@ -268,6 +279,7 @@ public class ShardReplicationOperationTests extends ElasticsearchTestCase {
listener.get();
assertTrue("request wasn't processed on primary, despite of it being assigned", request.processedOnPrimary.get());
assertIndexShardCounter(1);
}
@Test
@ -290,17 +302,23 @@ public class ShardReplicationOperationTests extends ElasticsearchTestCase {
if (primaryNodeId.equals(clusterService.localNode().id())) {
logger.info("--> primary is assigned locally, testing for execution");
assertTrue("request failed to be processed on a local primary", request.processedOnPrimary.get());
if (transport.capturedRequests().length > 0) {
assertIndexShardCounter(2);
} else {
assertIndexShardCounter(1);
}
} else {
logger.info("--> primary is assigned to [{}], checking request forwarded", primaryNodeId);
final List<CapturingTransport.CapturedRequest> capturedRequests = transport.capturedRequestsByTargetNode().get(primaryNodeId);
assertThat(capturedRequests, notNullValue());
assertThat(capturedRequests.size(), equalTo(1));
assertThat(capturedRequests.get(0).action, equalTo("testAction"));
assertIndexShardUninitialized();
}
}
@Test
public void testWriteConsistency() {
public void testWriteConsistency() throws ExecutionException, InterruptedException {
action = new ActionWithConsistency(ImmutableSettings.EMPTY, "testActionWithConsistency", transportService, clusterService, threadPool);
final String index = "test";
final ShardId shardId = new ShardId(index, 0);
@ -348,17 +366,23 @@ public class ShardReplicationOperationTests extends ElasticsearchTestCase {
assertThat(primaryPhase.checkWriteConsistency(shardRoutingTable.primaryShard()), nullValue());
primaryPhase.run();
assertTrue("operations should have been perform, consistency level is met", request.processedOnPrimary.get());
if (assignedReplicas > 0) {
assertIndexShardCounter(2);
} else {
assertIndexShardCounter(1);
}
} else {
assertThat(primaryPhase.checkWriteConsistency(shardRoutingTable.primaryShard()), notNullValue());
primaryPhase.run();
assertFalse("operations should not have been perform, consistency level is *NOT* met", request.processedOnPrimary.get());
assertIndexShardUninitialized();
for (int i = 0; i < replicaStates.length; i++) {
replicaStates[i] = ShardRoutingState.STARTED;
}
clusterService.setState(state(index, true, ShardRoutingState.STARTED, replicaStates));
assertTrue("once the consistency level met, operation should continue", request.processedOnPrimary.get());
assertIndexShardCounter(2);
}
}
@Test
@ -407,7 +431,6 @@ public class ShardReplicationOperationTests extends ElasticsearchTestCase {
totalShards++;
}
}
runReplicateTest(shardRoutingTable, assignedReplicas, totalShards);
}
@ -421,13 +444,14 @@ public class ShardReplicationOperationTests extends ElasticsearchTestCase {
logger.debug("expecting [{}] assigned replicas, [{}] total shards. using state: \n{}", assignedReplicas, totalShards, clusterService.state().prettyPrint());
final TransportShardReplicationOperationAction<Request, Request, Response>.InternalRequest internalRequest = action.new InternalRequest(request);
internalRequest.concreteIndex(shardId.index().name());
Releasable reference = getOrCreateIndexShardOperationsCounter();
assertIndexShardCounter(2);
TransportShardReplicationOperationAction<Request, Request, Response>.ReplicationPhase replicationPhase =
action.new ReplicationPhase(shardIt, request,
new Response(), new ClusterStateObserver(clusterService, logger),
primaryShard, internalRequest, listener);
primaryShard, internalRequest, listener, reference);
assertThat(replicationPhase.totalShards(), equalTo(totalShards));
assertThat(replicationPhase.pending(), equalTo(assignedReplicas));
@ -472,8 +496,158 @@ public class ShardReplicationOperationTests extends ElasticsearchTestCase {
for (CapturingTransport.CapturedRequest capturedRequest : transport.capturedRequests()) {
assertThat(capturedRequest.action, equalTo(ShardStateAction.SHARD_FAILED_ACTION_NAME));
}
// all replicas have responded so the counter should be decreased again
assertIndexShardCounter(1);
}
@Test
public void testCounterOnPrimary() throws InterruptedException, ExecutionException, IOException {
final String index = "test";
final ShardId shardId = new ShardId(index, 0);
// no replica, we only want to test on primary
clusterService.setState(state(index, true,
ShardRoutingState.STARTED));
logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint());
Request request = new Request(shardId).timeout("100ms");
PlainActionFuture<Response> listener = new PlainActionFuture<>();
/**
* Execute an action that is stuck in shard operation until a latch is counted down.
* That way we can start the operation, check if the counter was incremented and then unblock the operation
* again to see if the counter is decremented afterwards.
* TODO: I could also write an action that asserts that the counter is 2 in the shard operation.
* However, this failure would only become apparent once listener.get is called. Seems a little implicit.
* */
action = new ActionWithDelay(ImmutableSettings.EMPTY, "testActionWithExceptions", transportService, clusterService, threadPool);
final TransportShardReplicationOperationAction<Request, Request, Response>.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, listener);
Thread t = new Thread() {
public void run() {
primaryPhase.run();
}
};
t.start();
// shard operation should be ongoing, so the counter is at 2
// we have to wait here because increment happens in thread
awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(@Nullable Object input) {
return (count.get() == 2);
}
});
assertIndexShardCounter(2);
assertThat(transport.capturedRequests().length, equalTo(0));
((ActionWithDelay) action).countDownLatch.countDown();
t.join();
listener.get();
// operation finished, counter back to 0
assertIndexShardCounter(1);
assertThat(transport.capturedRequests().length, equalTo(0));
}
@Test
public void testCounterIncrementedWhileReplicationOngoing() throws InterruptedException, ExecutionException, IOException {
final String index = "test";
final ShardId shardId = new ShardId(index, 0);
// one replica to make sure replication is attempted
clusterService.setState(state(index, true,
ShardRoutingState.STARTED, ShardRoutingState.STARTED));
logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint());
Request request = new Request(shardId).timeout("100ms");
PlainActionFuture<Response> listener = new PlainActionFuture<>();
TransportShardReplicationOperationAction<Request, Request, Response>.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, listener);
primaryPhase.run();
assertIndexShardCounter(2);
assertThat(transport.capturedRequests().length, equalTo(1));
// try once with successful response
transport.handleResponse(transport.capturedRequests()[0].requestId, TransportResponse.Empty.INSTANCE);
assertIndexShardCounter(1);
transport.clear();
request = new Request(shardId).timeout("100ms");
primaryPhase = action.new PrimaryPhase(request, listener);
primaryPhase.run();
assertIndexShardCounter(2);
assertThat(transport.capturedRequests().length, equalTo(1));
// try with failure response
transport.handleResponse(transport.capturedRequests()[0].requestId, new CorruptIndexException("simulated", (String) null));
assertIndexShardCounter(1);
}
@Test
public void testReplicasCounter() throws Exception {
final ShardId shardId = new ShardId("test", 0);
clusterService.setState(state(shardId.index().getName(), true,
ShardRoutingState.STARTED, ShardRoutingState.STARTED));
action = new ActionWithDelay(ImmutableSettings.EMPTY, "testActionWithExceptions", transportService, clusterService, threadPool);
final Action.ReplicaOperationTransportHandler replicaOperationTransportHandler = action.new ReplicaOperationTransportHandler();
Thread t = new Thread() {
public void run() {
try {
replicaOperationTransportHandler.messageReceived(new Request(), createTransportChannel());
} catch (Exception e) {
}
}
};
t.start();
// shard operation should be ongoing, so the counter is at 2
// we have to wait here because increment happens in thread
awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(@Nullable Object input) {
return count.get() == 2;
}
});
((ActionWithDelay) action).countDownLatch.countDown();
t.join();
// operation should have finished and counter decreased because no outstanding replica requests
assertIndexShardCounter(1);
// now check if this also works if operation throws exception
action = new ActionWithExceptions(ImmutableSettings.EMPTY, "testActionWithExceptions", transportService, clusterService, threadPool);
final Action.ReplicaOperationTransportHandler replicaOperationTransportHandlerForException = action.new ReplicaOperationTransportHandler();
try {
replicaOperationTransportHandlerForException.messageReceived(new Request(shardId), createTransportChannel());
fail();
} catch (Throwable t2) {
}
assertIndexShardCounter(1);
}
@Test
public void testCounterDecrementedIfShardOperationThrowsException() throws InterruptedException, ExecutionException, IOException {
action = new ActionWithExceptions(ImmutableSettings.EMPTY, "testActionWithExceptions", transportService, clusterService, threadPool);
final String index = "test";
final ShardId shardId = new ShardId(index, 0);
clusterService.setState(state(index, true,
ShardRoutingState.STARTED, ShardRoutingState.STARTED));
logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint());
Request request = new Request(shardId).timeout("100ms");
PlainActionFuture<Response> listener = new PlainActionFuture<>();
TransportShardReplicationOperationAction<Request, Request, Response>.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, listener);
primaryPhase.run();
// no replica request should have been sent yet
assertThat(transport.capturedRequests().length, equalTo(0));
// no matter if the operation is retried or not, counter must be be back to 1
assertIndexShardCounter(1);
}
private void assertIndexShardCounter(int expected) {
assertThat(count.get(), equalTo(expected));
}
private final AtomicInteger count = new AtomicInteger(0);
/*
* Returns testIndexShardOperationsCounter or initializes it if it was already created in this test run.
* */
private synchronized Releasable getOrCreateIndexShardOperationsCounter() {
count.incrementAndGet();
return new Releasable() {
@Override
public void close() {
count.decrementAndGet();
}
};
}
static class Request extends ShardReplicationOperationRequest<Request> {
int shardId;
@ -481,7 +655,7 @@ public class ShardReplicationOperationTests extends ElasticsearchTestCase {
public AtomicInteger processedOnReplicas = new AtomicInteger();
Request() {
this.operationThreaded(false);
this.operationThreaded(randomBoolean());
}
Request(ShardId shardId) {
@ -505,10 +679,9 @@ public class ShardReplicationOperationTests extends ElasticsearchTestCase {
}
static class Response extends ActionWriteResponse {
}
static class Action extends TransportShardReplicationOperationAction<Request, Request, Response> {
class Action extends TransportShardReplicationOperationAction<Request, Request, Response> {
Action(Settings settings, String actionName, TransportService transportService,
ClusterService clusterService,
@ -549,9 +722,14 @@ public class ShardReplicationOperationTests extends ElasticsearchTestCase {
protected boolean resolveIndex() {
return false;
}
@Override
protected Releasable getIndexShardOperationsCounter(ShardId shardId) {
return getOrCreateIndexShardOperationsCounter();
}
}
static class ActionWithConsistency extends Action {
class ActionWithConsistency extends Action {
ActionWithConsistency(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, ThreadPool threadPool) {
super(settings, actionName, transportService, clusterService, threadPool);
@ -567,5 +745,97 @@ public class ShardReplicationOperationTests extends ElasticsearchTestCase {
return new DiscoveryNode("node_" + nodeId, DummyTransportAddress.INSTANCE, Version.CURRENT);
}
/*
* Throws exceptions when executed. Used for testing if the counter is correctly decremented in case an operation fails.
* */
class ActionWithExceptions extends Action {
ActionWithExceptions(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, ThreadPool threadPool) throws IOException {
super(settings, actionName, transportService, clusterService, threadPool);
}
@Override
protected Tuple<Response, Request> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) throws Throwable {
return throwException(shardRequest.shardId);
}
private Tuple<Response, Request> throwException(ShardId shardId) {
try {
if (randomBoolean()) {
// throw a generic exception
// for testing on replica this will actually cause an NPE because it will make the shard fail but
// for this we need an IndicesService which is null.
throw new ElasticsearchException("simulated");
} else {
// throw an exception which will cause retry on primary and be ignored on replica
throw new IndexShardNotStartedException(shardId, IndexShardState.RECOVERING);
}
} catch (Exception e) {
logger.info("throwing ", e);
throw e;
}
}
@Override
protected void shardOperationOnReplica(ShardId shardId, Request shardRequest) {
throwException(shardRequest.internalShardId);
}
}
/**
* Delays the operation until countDownLatch is counted down
*/
class ActionWithDelay extends Action {
CountDownLatch countDownLatch = new CountDownLatch(1);
ActionWithDelay(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, ThreadPool threadPool) throws IOException {
super(settings, actionName, transportService, clusterService, threadPool);
}
@Override
protected Tuple<Response, Request> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) throws Throwable {
awaitLatch();
return new Tuple<>(new Response(), shardRequest.request);
}
private void awaitLatch() throws InterruptedException {
countDownLatch.await();
countDownLatch = new CountDownLatch(1);
}
@Override
protected void shardOperationOnReplica(ShardId shardId, Request shardRequest) {
try {
awaitLatch();
} catch (InterruptedException e) {
}
}
}
/*
* Transport channel that is needed for replica operation testing.
* */
public TransportChannel createTransportChannel() {
return new TransportChannel() {
@Override
public String action() {
return null;
}
@Override
public void sendResponse(TransportResponse response) throws IOException {
}
@Override
public void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException {
}
@Override
public void sendResponse(Throwable error) throws IOException {
}
};
}
}

View File

@ -23,18 +23,23 @@ import org.elasticsearch.cluster.routing.MutableShardRouting;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.test.ElasticsearchSingleNodeTest;
import org.junit.Test;
import java.io.Closeable;
import java.io.IOException;
import java.nio.file.Path;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
/**
* Simple unit-test IndexShard related operations.
@ -95,20 +100,20 @@ public class IndexShardTests extends ElasticsearchSingleNodeTest {
IndexShard shard = test.shard(0);
ShardStateMetaData shardStateMetaData = load(logger, env.availableShardPaths(shard.shardId));
assertEquals(getShardStateMetadata(shard), shardStateMetaData);
ShardRouting routing = new MutableShardRouting(shard.shardRouting, shard.shardRouting.version()+1);
ShardRouting routing = new MutableShardRouting(shard.shardRouting, shard.shardRouting.version() + 1);
shard.updateRoutingEntry(routing, true);
shardStateMetaData = load(logger, env.availableShardPaths(shard.shardId));
assertEquals(shardStateMetaData, getShardStateMetadata(shard));
assertEquals(shardStateMetaData, new ShardStateMetaData(routing.version(), routing.primary(), shard.indexSettings.get(IndexMetaData.SETTING_UUID)));
routing = new MutableShardRouting(shard.shardRouting, shard.shardRouting.version()+1);
routing = new MutableShardRouting(shard.shardRouting, shard.shardRouting.version() + 1);
shard.updateRoutingEntry(routing, true);
shardStateMetaData = load(logger, env.availableShardPaths(shard.shardId));
assertEquals(shardStateMetaData, getShardStateMetadata(shard));
assertEquals(shardStateMetaData, new ShardStateMetaData(routing.version(), routing.primary(), shard.indexSettings.get(IndexMetaData.SETTING_UUID)));
routing = new MutableShardRouting(shard.shardRouting, shard.shardRouting.version()+1);
routing = new MutableShardRouting(shard.shardRouting, shard.shardRouting.version() + 1);
shard.updateRoutingEntry(routing, true);
shardStateMetaData = load(logger, env.availableShardPaths(shard.shardId));
assertEquals(shardStateMetaData, getShardStateMetadata(shard));
@ -122,13 +127,13 @@ public class IndexShardTests extends ElasticsearchSingleNodeTest {
assertEquals("inactive shard state shouldn't be persisted", shardStateMetaData, new ShardStateMetaData(routing.version(), routing.primary(), shard.indexSettings.get(IndexMetaData.SETTING_UUID)));
shard.updateRoutingEntry(new MutableShardRouting(shard.shardRouting, shard.shardRouting.version()+1), false);
shard.updateRoutingEntry(new MutableShardRouting(shard.shardRouting, shard.shardRouting.version() + 1), false);
shardStateMetaData = load(logger, env.availableShardPaths(shard.shardId));
assertFalse("shard state persisted despite of persist=false", shardStateMetaData.equals(getShardStateMetadata(shard)));
assertEquals("shard state persisted despite of persist=false", shardStateMetaData, new ShardStateMetaData(routing.version(), routing.primary(), shard.indexSettings.get(IndexMetaData.SETTING_UUID)));
routing = new MutableShardRouting(shard.shardRouting, shard.shardRouting.version()+1);
routing = new MutableShardRouting(shard.shardRouting, shard.shardRouting.version() + 1);
shard.updateRoutingEntry(routing, true);
shardStateMetaData = load(logger, env.availableShardPaths(shard.shardId));
assertEquals(shardStateMetaData, getShardStateMetadata(shard));
@ -153,15 +158,13 @@ public class IndexShardTests extends ElasticsearchSingleNodeTest {
ShardStateMetaData shardStateMetaData = load(logger, env.availableShardPaths(shard.shardId));
assertEquals(shardStateMetaData, getShardStateMetadata(shard));
routing = new MutableShardRouting(shard.shardId.index().getName(), shard.shardId.id(), routing.currentNodeId(), routing.primary(), ShardRoutingState.INITIALIZING, shard.shardRouting.version()+1);
routing = new MutableShardRouting(shard.shardId.index().getName(), shard.shardId.id(), routing.currentNodeId(), routing.primary(), ShardRoutingState.INITIALIZING, shard.shardRouting.version() + 1);
shard.updateRoutingEntry(routing, true);
shard.deleteShardState();
assertNull("no shard state expected after delete on initializing", load(logger, env.availableShardPaths(shard.shardId)));
}
ShardStateMetaData getShardStateMetadata(IndexShard shard) {
@ -180,7 +183,7 @@ public class IndexShardTests extends ElasticsearchSingleNodeTest {
assertEquals(meta.hashCode(), new ShardStateMetaData(meta.version, meta.primary, meta.indexUUID).hashCode());
assertFalse(meta.equals(new ShardStateMetaData(meta.version, !meta.primary, meta.indexUUID)));
assertFalse(meta.equals(new ShardStateMetaData(meta.version+1, meta.primary, meta.indexUUID)));
assertFalse(meta.equals(new ShardStateMetaData(meta.version + 1, meta.primary, meta.indexUUID)));
assertFalse(meta.equals(new ShardStateMetaData(meta.version, !meta.primary, meta.indexUUID + "foo")));
Set<Integer> hashCodes = new HashSet<>();
for (int i = 0; i < 30; i++) { // just a sanity check that we impl hashcode
@ -191,6 +194,41 @@ public class IndexShardTests extends ElasticsearchSingleNodeTest {
}
@Test
public void testDeleteIndexDecreasesCounter() throws InterruptedException, ExecutionException, IOException {
assertAcked(client().admin().indices().prepareCreate("test").setSettings(ImmutableSettings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0)).get());
ensureGreen("test");
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexService indexService = indicesService.indexServiceSafe("test");
IndexShard indexShard = indexService.shard(0);
client().admin().indices().prepareDelete("test").get();
assertThat(indexShard.getOperationsCount(), equalTo(0));
try {
indexShard.incrementOperationCounter();
fail("we should not be able to increment anymore");
} catch (IndexShardClosedException e) {
// expected
}
}
@Test
public void testIndexShardCounter() throws InterruptedException, ExecutionException, IOException {
assertAcked(client().admin().indices().prepareCreate("test").setSettings(ImmutableSettings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0)).get());
ensureGreen("test");
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexService indexService = indicesService.indexServiceSafe("test");
IndexShard indexShard = indexService.shard(0);
indexShard.incrementOperationCounter();
assertEquals(2, indexShard.getOperationsCount());
indexShard.incrementOperationCounter();
assertEquals(3, indexShard.getOperationsCount());
indexShard.decrementOperationCounter();
indexShard.decrementOperationCounter();
assertEquals(1, indexShard.getOperationsCount());
}
public static ShardStateMetaData load(ESLogger logger, Path... shardPaths) throws IOException {
return ShardStateMetaData.FORMAT.loadLatestState(logger, shardPaths);
}

View File

@ -648,6 +648,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
}
ensureClusterSizeConsistency();
ensureClusterStateConsistency();
cluster().beforeIndexDeletion();
cluster().wipe(); // wipe after to make sure we fail in the test that didn't ack the delete
if (afterClass || currentClusterScope == Scope.TEST) {
cluster().close();

View File

@ -26,16 +26,10 @@ import com.carrotsearch.randomizedtesting.generators.RandomPicks;
import com.carrotsearch.randomizedtesting.generators.RandomStrings;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.Collections2;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.collect.*;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
@ -80,7 +74,9 @@ import org.elasticsearch.index.cache.filter.FilterCacheModule;
import org.elasticsearch.index.cache.filter.FilterCacheModule.FilterCacheSettings;
import org.elasticsearch.index.cache.filter.index.IndexFilterCache;
import org.elasticsearch.index.cache.filter.none.NoneFilterCache;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardModule;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.IndexStoreModule;
import org.elasticsearch.indices.IndicesService;
@ -112,34 +108,19 @@ import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static junit.framework.Assert.fail;
import static org.apache.lucene.util.LuceneTestCase.TEST_NIGHTLY;
import static org.apache.lucene.util.LuceneTestCase.rarely;
import static org.apache.lucene.util.LuceneTestCase.usually;
import static org.apache.lucene.util.LuceneTestCase.*;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
import static org.elasticsearch.test.ElasticsearchTestCase.assertBusy;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoTimeout;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
@ -855,6 +836,7 @@ public final class InternalTestCluster extends TestCluster {
}
public static final String TRANSPORT_CLIENT_PREFIX = "transport_client_";
static class TransportClientFactory {
private final boolean sniff;
private final Settings settings;
@ -976,6 +958,26 @@ public final class InternalTestCluster extends TestCluster {
randomlyResetClients(); /* reset all clients - each test gets its own client based on the Random instance created above. */
}
@Override
public void beforeIndexDeletion() {
assertShardIndexCounter();
}
private void assertShardIndexCounter() {
final Collection<NodeAndClient> nodesAndClients = nodes.values();
for (NodeAndClient nodeAndClient : nodesAndClients) {
IndicesService indexServices = getInstance(IndicesService.class, nodeAndClient.name);
for (IndexService indexService : indexServices) {
for (IndexShard indexShard : indexService) {
assertThat(indexShard.getOperationsCount(), anyOf(equalTo(1), equalTo(0)));
if (indexShard.getOperationsCount() == 0) {
assertThat(indexShard.state(), equalTo(IndexShardState.CLOSED));
}
}
}
}
}
private void randomlyResetClients() throws IOException {
// only reset the clients on nightly tests, it causes heavy load...
if (RandomizedTest.isNightly() && rarely(random)) {

View File

@ -76,6 +76,12 @@ public abstract class TestCluster implements Iterable<Client>, Closeable {
wipeRepositories();
}
/**
* Assertions that should run before the cluster is wiped should be called in this method
*/
public void beforeIndexDeletion() {
}
/**
* This method checks all the things that need to be checked after each test
*/