From ba59cf12627da89bee3f36efdb7fb1b66b232698 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Thu, 8 Feb 2018 22:59:02 +0100 Subject: [PATCH] Capture stack traces while issuing IndexShard operations permits to easy debugging (#28567) Today we acquire a permit from the shard to coordinate between indexing operations, recoveries and other state transitions. When we leak an permit it's practically impossible to find who the culprit is. This PR add stack traces capturing for each permit so we can identify which part of the code is responsible for acquiring the unreleased permit. This code is only active when assertions are active. The output is something like: ``` java.lang.AssertionError: shard [test][1] on node [node_s0] has pending operations: --> java.lang.RuntimeException: something helpful 2 at org.elasticsearch.index.shard.IndexShardOperationPermits.acquire(IndexShardOperationPermits.java:223) at org.elasticsearch.index.shard.IndexShard.(IndexShard.java:322) at org.elasticsearch.index.IndexService.createShard(IndexService.java:382) at org.elasticsearch.indices.IndicesService.createShard(IndicesService.java:514) at org.elasticsearch.indices.IndicesService.createShard(IndicesService.java:143) at org.elasticsearch.indices.cluster.IndicesClusterStateService.createShard(IndicesClusterStateService.java:552) at org.elasticsearch.indices.cluster.IndicesClusterStateService.createOrUpdateShards(IndicesClusterStateService.java:529) at org.elasticsearch.indices.cluster.IndicesClusterStateService.applyClusterState(IndicesClusterStateService.java:231) at org.elasticsearch.cluster.service.ClusterApplierService.lambda$callClusterStateAppliers$6(ClusterApplierService.java:498) at java.base/java.lang.Iterable.forEach(Iterable.java:75) at org.elasticsearch.cluster.service.ClusterApplierService.callClusterStateAppliers(ClusterApplierService.java:495) at org.elasticsearch.cluster.service.ClusterApplierService.applyChanges(ClusterApplierService.java:482) at org.elasticsearch.cluster.service.ClusterApplierService.runTask(ClusterApplierService.java:432) at org.elasticsearch.cluster.service.ClusterApplierService$UpdateTask.run(ClusterApplierService.java:161) at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:566) at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.runAndClean(PrioritizedEsThreadPoolExecutor.java:244) at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.run(PrioritizedEsThreadPoolExecutor.java:207) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1167) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641) at java.base/java.lang.Thread.run(Thread.java:844) --> java.lang.RuntimeException: something helpful at org.elasticsearch.index.shard.IndexShardOperationPermits.acquire(IndexShardOperationPermits.java:223) at org.elasticsearch.index.shard.IndexShard.(IndexShard.java:311) at org.elasticsearch.index.IndexService.createShard(IndexService.java:382) at org.elasticsearch.indices.IndicesService.createShard(IndicesService.java:514) at org.elasticsearch.indices.IndicesService.createShard(IndicesService.java:143) at org.elasticsearch.indices.cluster.IndicesClusterStateService.createShard(IndicesClusterStateService.java:552) at org.elasticsearch.indices.cluster.IndicesClusterStateService.createOrUpdateShards(IndicesClusterStateService.java:529) at org.elasticsearch.indices.cluster.IndicesClusterStateService.applyClusterState(IndicesClusterStateService.java:231) at org.elasticsearch.cluster.service.ClusterApplierService.lambda$callClusterStateAppliers$6(ClusterApplierService.java:498) at java.base/java.lang.Iterable.forEach(Iterable.java:75) at org.elasticsearch.cluster.service.ClusterApplierService.callClusterStateAppliers(ClusterApplierService.java:495) at org.elasticsearch.cluster.service.ClusterApplierService.applyChanges(ClusterApplierService.java:482) at org.elasticsearch.cluster.service.ClusterApplierService.runTask(ClusterApplierService.java:432) at org.elasticsearch.cluster.service.ClusterApplierService$UpdateTask.run(ClusterApplierService.java:161) at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:566) at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.runAndClean(PrioritizedEsThreadPoolExecutor.java:244) at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.run(PrioritizedEsThreadPoolExecutor.java:207) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1167) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641) at java.base/java.lang.Thread.run(Thread.java:844) ``` --- .../TransportReplicationAction.java | 10 +-- .../org/elasticsearch/index/IndexService.java | 2 +- .../elasticsearch/index/shard/IndexShard.java | 26 ++++-- .../shard/IndexShardOperationPermits.java | 84 ++++++++++++++++--- .../recovery/RecoverySourceHandler.java | 15 ++-- .../TransportReplicationActionTests.java | 7 +- .../TransportWriteActionTests.java | 5 +- .../ESIndexLevelReplicationTestCase.java | 2 +- .../IndexShardOperationPermitsTests.java | 63 ++++++++++---- .../index/shard/IndexShardTests.java | 38 +++++---- .../flush/SyncedFlushSingleNodeTests.java | 4 +- .../recovery/RecoverySourceHandlerTests.java | 3 +- .../test/InternalTestCluster.java | 34 ++------ 13 files changed, 196 insertions(+), 97 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 1a57b6a5d95..4398c56f26c 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -202,7 +202,7 @@ public abstract class TransportReplicationAction< /** * Synchronously execute the specified replica operation. This is done under a permit from - * {@link IndexShard#acquireReplicaOperationPermit(long, long, ActionListener, String)}. + * {@link IndexShard#acquireReplicaOperationPermit(long, long, ActionListener, String, Object)}. * * @param shardRequest the request to the replica shard * @param replica the replica shard to perform the operation on @@ -317,7 +317,7 @@ public abstract class TransportReplicationAction< @Override protected void doRun() throws Exception { - acquirePrimaryShardReference(request.shardId(), targetAllocationID, primaryTerm, this); + acquirePrimaryShardReference(request.shardId(), targetAllocationID, primaryTerm, this, request); } @Override @@ -638,7 +638,7 @@ public abstract class TransportReplicationAction< throw new ShardNotFoundException(this.replica.shardId(), "expected aID [{}] but found [{}]", targetAllocationID, actualAllocationId); } - replica.acquireReplicaOperationPermit(primaryTerm, globalCheckpoint, this, executor); + replica.acquireReplicaOperationPermit(primaryTerm, globalCheckpoint, this, executor, request); } /** @@ -950,7 +950,7 @@ public abstract class TransportReplicationAction< * and replication of the operation to all replica shards is completed / failed (see {@link ReplicationOperation}). */ private void acquirePrimaryShardReference(ShardId shardId, String allocationId, long primaryTerm, - ActionListener onReferenceAcquired) { + ActionListener onReferenceAcquired, Object debugInfo) { IndexShard indexShard = getIndexShard(shardId); // we may end up here if the cluster state used to route the primary is so stale that the underlying // index shard was replaced with a replica. For example - in a two node cluster, if the primary fails @@ -981,7 +981,7 @@ public abstract class TransportReplicationAction< } }; - indexShard.acquirePrimaryOperationPermit(onAcquired, executor); + indexShard.acquirePrimaryOperationPermit(onAcquired, executor, debugInfo); } class ShardReference implements Releasable { diff --git a/server/src/main/java/org/elasticsearch/index/IndexService.java b/server/src/main/java/org/elasticsearch/index/IndexService.java index 0285dcf93c1..cdcefa5e02b 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexService.java +++ b/server/src/main/java/org/elasticsearch/index/IndexService.java @@ -775,7 +775,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust e); } }), - ThreadPool.Names.SAME); + ThreadPool.Names.SAME, "background global checkpoint sync"); } catch (final AlreadyClosedException | IndexShardClosedException e) { // the shard was closed concurrently, continue } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 8a203cb4308..29cd4abc823 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2192,19 +2192,23 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl * Acquire a primary operation permit whenever the shard is ready for indexing. If a permit is directly available, the provided * ActionListener will be called on the calling thread. During relocation hand-off, permit acquisition can be delayed. The provided * ActionListener will then be called using the provided executor. + * + * @param debugInfo an extra information that can be useful when tracing an unreleased permit. When assertions are enabled + * the tracing will capture the supplied object's {@link Object#toString()} value. Otherwise the object + * isn't used */ - public void acquirePrimaryOperationPermit(ActionListener onPermitAcquired, String executorOnDelay) { + public void acquirePrimaryOperationPermit(ActionListener onPermitAcquired, String executorOnDelay, Object debugInfo) { verifyNotClosed(); verifyPrimary(); - indexShardOperationPermits.acquire(onPermitAcquired, executorOnDelay, false); + indexShardOperationPermits.acquire(onPermitAcquired, executorOnDelay, false, debugInfo); } private final Object primaryTermMutex = new Object(); /** * Acquire a replica operation permit whenever the shard is ready for indexing (see - * {@link #acquirePrimaryOperationPermit(ActionListener, String)}). If the given primary term is lower than then one in + * {@link #acquirePrimaryOperationPermit(ActionListener, String, Object)}). If the given primary term is lower than then one in * {@link #shardRouting}, the {@link ActionListener#onFailure(Exception)} method of the provided listener is invoked with an * {@link IllegalStateException}. If permit acquisition is delayed, the listener will be invoked on the executor with the specified * name. @@ -2213,9 +2217,13 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl * @param globalCheckpoint the global checkpoint associated with the request * @param onPermitAcquired the listener for permit acquisition * @param executorOnDelay the name of the executor to invoke the listener on if permit acquisition is delayed + * @param debugInfo an extra information that can be useful when tracing an unreleased permit. When assertions are enabled + * the tracing will capture the supplied object's {@link Object#toString()} value. Otherwise the object + * isn't used */ public void acquireReplicaOperationPermit(final long operationPrimaryTerm, final long globalCheckpoint, - final ActionListener onPermitAcquired, final String executorOnDelay) { + final ActionListener onPermitAcquired, final String executorOnDelay, + final Object debugInfo) { verifyNotClosed(); verifyReplicationTarget(); final boolean globalCheckpointUpdated; @@ -2301,13 +2309,21 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl } }, executorOnDelay, - true); + true, debugInfo); } public int getActiveOperationsCount() { return indexShardOperationPermits.getActiveOperationsCount(); // refCount is incremented on successful acquire and decremented on close } + /** + * @return a list of containing an exception for each operation permit that wasn't released yet. The stack traces of the exceptions + * was captured when the operation acquired the permit and their message contains the debug information supplied at the time. + */ + public List getActiveOperations() { + return indexShardOperationPermits.getActiveOperations(); + } + private final AsyncIOProcessor translogSyncProcessor = new AsyncIOProcessor(logger, 1024) { @Override protected void write(List>> candidates) throws IOException { diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java index 75645198f5b..338b330a394 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java @@ -19,7 +19,6 @@ package org.elasticsearch.index.shard; -import org.apache.logging.log4j.Logger; import org.apache.lucene.util.IOUtils; import org.elasticsearch.Assertions; import org.elasticsearch.action.ActionListener; @@ -33,6 +32,8 @@ import org.elasticsearch.threadpool.ThreadPool; import java.io.Closeable; import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -53,10 +54,14 @@ final class IndexShardOperationPermits implements Closeable { static final int TOTAL_PERMITS = Integer.MAX_VALUE; final Semaphore semaphore = new Semaphore(TOTAL_PERMITS, true); // fair to ensure a blocking thread is not starved - private final List> delayedOperations = new ArrayList<>(); // operations that are delayed + private final List delayedOperations = new ArrayList<>(); // operations that are delayed private volatile boolean closed; private boolean delayed; // does not need to be volatile as all accesses are done under a lock on this + // only valid when assertions are enabled. Key is AtomicBoolean associated with each permit to ensure close once semantics. Value is an + // exception with some extra info in the message + a stack trace of the acquirer + private final Map issuedPermits; + /** * Construct operation permits for the specified shards. * @@ -66,6 +71,11 @@ final class IndexShardOperationPermits implements Closeable { IndexShardOperationPermits(final ShardId shardId, final ThreadPool threadPool) { this.shardId = shardId; this.threadPool = threadPool; + if (Assertions.ENABLED) { + issuedPermits = new ConcurrentHashMap<>(); + } else { + issuedPermits = null; + } } @Override @@ -167,7 +177,7 @@ final class IndexShardOperationPermits implements Closeable { } private void releaseDelayedOperations() { - final List> queuedActions; + final List queuedActions; synchronized (this) { assert delayed; queuedActions = new ArrayList<>(delayedOperations); @@ -185,8 +195,8 @@ final class IndexShardOperationPermits implements Closeable { * recovery */ threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> { - for (ActionListener queuedAction : queuedActions) { - acquire(queuedAction, null, false); + for (DelayedOperation queuedAction : queuedActions) { + acquire(queuedAction.listener, null, false, queuedAction.debugInfo); } }); } @@ -204,8 +214,24 @@ final class IndexShardOperationPermits implements Closeable { * @param onAcquired {@link ActionListener} that is invoked once acquisition is successful or failed * @param executorOnDelay executor to use for the possibly delayed {@link ActionListener#onResponse(Object)} call * @param forceExecution whether the runnable should force its execution in case it gets rejected + * @param debugInfo an extra information that can be useful when tracing an unreleased permit. When assertions are enabled + * the tracing will capture the supplied object's {@link Object#toString()} value. Otherwise the object + * isn't used + * */ - public void acquire(final ActionListener onAcquired, final String executorOnDelay, final boolean forceExecution) { + public void acquire(final ActionListener onAcquired, final String executorOnDelay, final boolean forceExecution, + final Object debugInfo) { + final Throwable debugInfoWithStackTrace; + if (Assertions.ENABLED) { + debugInfoWithStackTrace = new Throwable(debugInfo.toString()); + } else { + debugInfoWithStackTrace = null; + } + acquire(onAcquired, executorOnDelay, forceExecution, debugInfoWithStackTrace); + } + + private void acquire(final ActionListener onAcquired, final String executorOnDelay, final boolean forceExecution, + final Throwable debugInfo) { if (closed) { onAcquired.onFailure(new IndexShardClosedException(shardId)); return; @@ -215,16 +241,18 @@ final class IndexShardOperationPermits implements Closeable { synchronized (this) { if (delayed) { final Supplier contextSupplier = threadPool.getThreadContext().newRestorableContext(false); + final ActionListener wrappedListener; if (executorOnDelay != null) { - delayedOperations.add( - new PermitAwareThreadedActionListener(threadPool, executorOnDelay, - new ContextPreservingActionListener<>(contextSupplier, onAcquired), forceExecution)); + wrappedListener = + new PermitAwareThreadedActionListener(threadPool, executorOnDelay, + new ContextPreservingActionListener<>(contextSupplier, onAcquired), forceExecution); } else { - delayedOperations.add(new ContextPreservingActionListener<>(contextSupplier, onAcquired)); + wrappedListener = new ContextPreservingActionListener<>(contextSupplier, onAcquired); } + delayedOperations.add(new DelayedOperation(wrappedListener, debugInfo)); return; } else { - releasable = acquire(); + releasable = acquire(debugInfo); } } } catch (final InterruptedException e) { @@ -235,15 +263,23 @@ final class IndexShardOperationPermits implements Closeable { onAcquired.onResponse(releasable); } - private Releasable acquire() throws InterruptedException { + private Releasable acquire(Throwable debugInfo) throws InterruptedException { assert Thread.holdsLock(this); if (semaphore.tryAcquire(1, 0, TimeUnit.SECONDS)) { // the un-timed tryAcquire methods do not honor the fairness setting final AtomicBoolean closed = new AtomicBoolean(); - return () -> { + final Releasable releasable = () -> { if (closed.compareAndSet(false, true)) { + if (Assertions.ENABLED) { + Throwable e = issuedPermits.remove(closed); + assert e != null; + } semaphore.release(1); } }; + if (Assertions.ENABLED) { + issuedPermits.put(closed, debugInfo); + } + return releasable; } else { // this should never happen, if it does something is deeply wrong throw new IllegalStateException("failed to obtain permit but operations are not delayed"); @@ -269,6 +305,28 @@ final class IndexShardOperationPermits implements Closeable { } } + /** + * @return a list of containing an exception for each permit that wasn't released yet. The stack traces of the exceptions + * was captured when the operation acquired the permit and their message contains the debug information supplied at the time. + */ + List getActiveOperations() { + return new ArrayList<>(issuedPermits.values()); + } + + private static class DelayedOperation { + private final ActionListener listener; + private final Throwable debugInfo; + + private DelayedOperation(ActionListener listener, Throwable debugInfo) { + this.listener = listener; + if (Assertions.ENABLED) { + this.debugInfo = new Throwable("delayed", debugInfo); + } else { + this.debugInfo = null; + } + } + } + /** * A permit-aware action listener wrapper that spawns onResponse listener invocations off on a configurable thread-pool. * Being permit-aware, it also releases the permit when hitting thread-pool rejections and falls back to the diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 47c3d073f10..3f0408a7c29 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -142,7 +142,7 @@ public class RecoverySourceHandler { throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node"); } assert targetShardRouting.initializing() : "expected recovery target to be initializing but was " + targetShardRouting; - }); + }, shardId + " validating recovery target ["+ request.targetAllocationId() + "] registered "); try (Closeable ignored = shard.acquireTranslogRetentionLock()) { @@ -198,7 +198,8 @@ public class RecoverySourceHandler { * make sure to do this before sampling the max sequence number in the next step, to ensure that we send * all documents up to maxSeqNo in phase2. */ - runUnderPrimaryPermit(() -> shard.initiateTracking(request.targetAllocationId())); + runUnderPrimaryPermit(() -> shard.initiateTracking(request.targetAllocationId()), + shardId + " initiating tracking of " + request.targetAllocationId()); final long endingSeqNo = shard.seqNoStats().getMaxSeqNo(); /* @@ -229,10 +230,10 @@ public class RecoverySourceHandler { return targetHistoryUUID != null && targetHistoryUUID.equals(shard.getHistoryUUID()); } - private void runUnderPrimaryPermit(CancellableThreads.Interruptable runnable) { + private void runUnderPrimaryPermit(CancellableThreads.Interruptable runnable, String reason) { cancellableThreads.execute(() -> { final PlainActionFuture onAcquired = new PlainActionFuture<>(); - shard.acquirePrimaryOperationPermit(onAcquired, ThreadPool.Names.SAME); + shard.acquirePrimaryOperationPermit(onAcquired, ThreadPool.Names.SAME, reason); try (Releasable ignored = onAcquired.actionGet()) { // check that the IndexShard still has the primary authority. This needs to be checked under operation permit to prevent // races, as IndexShard will change to RELOCATED only when it holds all operation permits, see IndexShard.relocated() @@ -493,10 +494,12 @@ public class RecoverySourceHandler { * marking the shard as in-sync. If the relocation handoff holds all the permits then after the handoff completes and we acquire * the permit then the state of the shard will be relocated and this recovery will fail. */ - runUnderPrimaryPermit(() -> shard.markAllocationIdAsInSync(request.targetAllocationId(), targetLocalCheckpoint)); + runUnderPrimaryPermit(() -> shard.markAllocationIdAsInSync(request.targetAllocationId(), targetLocalCheckpoint), + shardId + " marking " + request.targetAllocationId() + " as in sync"); final long globalCheckpoint = shard.getGlobalCheckpoint(); cancellableThreads.executeIO(() -> recoveryTarget.finalizeRecovery(globalCheckpoint)); - runUnderPrimaryPermit(() -> shard.updateGlobalCheckpointForShard(request.targetAllocationId(), globalCheckpoint)); + runUnderPrimaryPermit(() -> shard.updateGlobalCheckpointForShard(request.targetAllocationId(), globalCheckpoint), + shardId + " updating " + request.targetAllocationId() + "'s global checkpoint"); if (request.isPrimaryRelocation()) { logger.trace("performing relocation hand-off"); diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index 2112a231d37..b9688053fba 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -117,6 +117,7 @@ import static org.hamcrest.Matchers.nullValue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; @@ -694,7 +695,7 @@ public class TransportReplicationActionTests extends ESTestCase { doAnswer(invocation -> { ((ActionListener)invocation.getArguments()[0]).onResponse(() -> {}); return null; - }).when(shard).acquirePrimaryOperationPermit(any(), anyString()); + }).when(shard).acquirePrimaryOperationPermit(any(), anyString(), anyObject()); AtomicBoolean closed = new AtomicBoolean(); Releasable releasable = () -> { @@ -1194,7 +1195,7 @@ public class TransportReplicationActionTests extends ESTestCase { count.incrementAndGet(); callback.onResponse(count::decrementAndGet); return null; - }).when(indexShard).acquirePrimaryOperationPermit(any(ActionListener.class), anyString()); + }).when(indexShard).acquirePrimaryOperationPermit(any(ActionListener.class), anyString(), anyObject()); doAnswer(invocation -> { long term = (Long)invocation.getArguments()[0]; ActionListener callback = (ActionListener) invocation.getArguments()[2]; @@ -1206,7 +1207,7 @@ public class TransportReplicationActionTests extends ESTestCase { count.incrementAndGet(); callback.onResponse(count::decrementAndGet); return null; - }).when(indexShard).acquireReplicaOperationPermit(anyLong(), anyLong(), any(ActionListener.class), anyString()); + }).when(indexShard).acquireReplicaOperationPermit(anyLong(), anyLong(), any(ActionListener.class), anyString(), anyObject()); when(indexShard.routingEntry()).thenAnswer(invocationOnMock -> { final ClusterState state = clusterService.state(); final RoutingNode node = state.getRoutingNodes().node(state.nodes().getLocalNodeId()); diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java index 805553b4a61..bed1b5de037 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java @@ -80,6 +80,7 @@ import static org.hamcrest.Matchers.instanceOf; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; @@ -449,7 +450,7 @@ public class TransportWriteActionTests extends ESTestCase { count.incrementAndGet(); callback.onResponse(count::decrementAndGet); return null; - }).when(indexShard).acquirePrimaryOperationPermit(any(ActionListener.class), anyString()); + }).when(indexShard).acquirePrimaryOperationPermit(any(ActionListener.class), anyString(), anyObject()); doAnswer(invocation -> { long term = (Long)invocation.getArguments()[0]; ActionListener callback = (ActionListener) invocation.getArguments()[1]; @@ -461,7 +462,7 @@ public class TransportWriteActionTests extends ESTestCase { count.incrementAndGet(); callback.onResponse(count::decrementAndGet); return null; - }).when(indexShard).acquireReplicaOperationPermit(anyLong(), anyLong(), any(ActionListener.class), anyString()); + }).when(indexShard).acquireReplicaOperationPermit(anyLong(), anyLong(), any(ActionListener.class), anyString(), anyObject()); when(indexShard.routingEntry()).thenAnswer(invocationOnMock -> { final ClusterState state = clusterService.state(); final RoutingNode node = state.getRoutingNodes().node(state.nodes().getLocalNodeId()); diff --git a/server/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/server/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index a67e843e468..f74ffdc4b4d 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/server/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -535,7 +535,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase listener.onFailure(e); } }, - ThreadPool.Names.INDEX); + ThreadPool.Names.INDEX, request); } @Override diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java index ab5192dfc3e..169f18da09a 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java @@ -35,6 +35,7 @@ import org.junit.Before; import org.junit.BeforeClass; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Set; @@ -48,10 +49,15 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; +import java.util.stream.Collectors; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.either; +import static org.hamcrest.Matchers.emptyIterable; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.hasToString; import static org.hamcrest.Matchers.instanceOf; @@ -122,7 +128,7 @@ public class IndexShardOperationPermitsTests extends ESTestCase { public void run() { latch.countDown(); try { - permits.acquire(future, threadPoolName, forceExecution); + permits.acquire(future, threadPoolName, forceExecution, ""); } catch (DummyException dummyException) { // ok, notify future assertTrue(failingListener); @@ -176,7 +182,7 @@ public class IndexShardOperationPermitsTests extends ESTestCase { public void testOperationsInvokedImmediatelyIfNoBlock() throws ExecutionException, InterruptedException { PlainActionFuture future = new PlainActionFuture<>(); - permits.acquire(future, ThreadPool.Names.GENERIC, true); + permits.acquire(future, ThreadPool.Names.GENERIC, true, ""); assertTrue(future.isDone()); future.get().close(); } @@ -184,7 +190,7 @@ public class IndexShardOperationPermitsTests extends ESTestCase { public void testOperationsIfClosed() throws ExecutionException, InterruptedException { PlainActionFuture future = new PlainActionFuture<>(); permits.close(); - permits.acquire(future, ThreadPool.Names.GENERIC, true); + permits.acquire(future, ThreadPool.Names.GENERIC, true, ""); ExecutionException exception = expectThrows(ExecutionException.class, future::get); assertThat(exception.getCause(), instanceOf(IndexShardClosedException.class)); } @@ -198,7 +204,7 @@ public class IndexShardOperationPermitsTests extends ESTestCase { public void testOperationsDelayedIfBlock() throws ExecutionException, InterruptedException, TimeoutException { PlainActionFuture future = new PlainActionFuture<>(); try (Releasable ignored = blockAndWait()) { - permits.acquire(future, ThreadPool.Names.GENERIC, true); + permits.acquire(future, ThreadPool.Names.GENERIC, true, ""); assertFalse(future.isDone()); } future.get(1, TimeUnit.HOURS).close(); @@ -245,8 +251,8 @@ public class IndexShardOperationPermitsTests extends ESTestCase { context.putHeader("foo", "bar"); context.putTransient("bar", "baz"); // test both with and without a executor name - permits.acquire(future, ThreadPool.Names.GENERIC, true); - permits.acquire(future2, null, true); + permits.acquire(future, ThreadPool.Names.GENERIC, true, ""); + permits.acquire(future2, null, true, ""); } assertFalse(future.isDone()); } @@ -329,7 +335,7 @@ public class IndexShardOperationPermitsTests extends ESTestCase { } }, ThreadPool.Names.GENERIC, - false)); + false, "")); thread.start(); assertFalse(delayed.get()); releaseBlock.countDown(); @@ -387,7 +393,7 @@ public class IndexShardOperationPermitsTests extends ESTestCase { } }, ThreadPool.Names.GENERIC, - false); + false, ""); }); secondOperationThread.start(); @@ -436,7 +442,7 @@ public class IndexShardOperationPermitsTests extends ESTestCase { } }, ThreadPool.Names.GENERIC, - false); + false, ""); }); thread.start(); threads.add(thread); @@ -490,12 +496,12 @@ public class IndexShardOperationPermitsTests extends ESTestCase { public void testActiveOperationsCount() throws ExecutionException, InterruptedException { PlainActionFuture future1 = new PlainActionFuture<>(); - permits.acquire(future1, ThreadPool.Names.GENERIC, true); + permits.acquire(future1, ThreadPool.Names.GENERIC, true, ""); assertTrue(future1.isDone()); assertThat(permits.getActiveOperationsCount(), equalTo(1)); PlainActionFuture future2 = new PlainActionFuture<>(); - permits.acquire(future2, ThreadPool.Names.GENERIC, true); + permits.acquire(future2, ThreadPool.Names.GENERIC, true, ""); assertTrue(future2.isDone()); assertThat(permits.getActiveOperationsCount(), equalTo(2)); @@ -511,7 +517,7 @@ public class IndexShardOperationPermitsTests extends ESTestCase { } PlainActionFuture future3 = new PlainActionFuture<>(); - permits.acquire(future3, ThreadPool.Names.GENERIC, true); + permits.acquire(future3, ThreadPool.Names.GENERIC, true, ""); assertTrue(future3.isDone()); assertThat(permits.getActiveOperationsCount(), equalTo(1)); future3.get().close(); @@ -594,7 +600,7 @@ public class IndexShardOperationPermitsTests extends ESTestCase { } }, ThreadPool.Names.GENERIC, - false)); + false, "")); assertThat(e, hasToString(containsString("failed to obtain permit but operations are not delayed"))); permits.semaphore.release(IndexShardOperationPermits.TOTAL_PERMITS); } @@ -645,8 +651,37 @@ public class IndexShardOperationPermitsTests extends ESTestCase { } }, ThreadPool.Names.GENERIC, - false); + false, ""); }; } + public void testPermitTraceCapturing() throws ExecutionException, InterruptedException { + final PlainActionFuture listener1 = new PlainActionFuture<>(); + permits.acquire(listener1, null, false, "listener1"); + final PlainActionFuture listener2 = new PlainActionFuture<>(); + permits.acquire(listener2, null, false, "listener2"); + + assertThat(permits.getActiveOperationsCount(), equalTo(2)); + List messages = permits.getActiveOperations().stream().map(Throwable::getMessage).collect(Collectors.toList()); + assertThat(messages, hasSize(2)); + assertThat(messages, containsInAnyOrder(Arrays.asList(containsString("listener1"), containsString("listener2")))); + + if (randomBoolean()) { + listener1.get().close(); + assertThat(permits.getActiveOperationsCount(), equalTo(1)); + messages = permits.getActiveOperations().stream().map(Throwable::getMessage).collect(Collectors.toList()); + assertThat(messages, hasSize(1)); + assertThat(messages, contains(containsString("listener2"))); + listener2.get().close(); + } else { + listener2.get().close(); + assertThat(permits.getActiveOperationsCount(), equalTo(1)); + messages = permits.getActiveOperations().stream().map(Throwable::getMessage).collect(Collectors.toList()); + assertThat(messages, hasSize(1)); + assertThat(messages, contains(containsString("listener1"))); + listener1.get().close(); + } + assertThat(permits.getActiveOperationsCount(), equalTo(0)); + assertThat(permits.getActiveOperations(), emptyIterable()); + } } diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 77f6149c4a3..1370dad06d7 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -150,7 +150,6 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasKey; -import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.hasToString; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.lessThan; @@ -282,14 +281,14 @@ public class IndexShardTests extends IndexShardTestCase { closeShards(indexShard); assertThat(indexShard.getActiveOperationsCount(), equalTo(0)); try { - indexShard.acquirePrimaryOperationPermit(null, ThreadPool.Names.INDEX); + indexShard.acquirePrimaryOperationPermit(null, ThreadPool.Names.INDEX, ""); fail("we should not be able to increment anymore"); } catch (IndexShardClosedException e) { // expected } try { indexShard.acquireReplicaOperationPermit(indexShard.getPrimaryTerm(), SequenceNumbers.UNASSIGNED_SEQ_NO, null, - ThreadPool.Names.INDEX); + ThreadPool.Names.INDEX, ""); fail("we should not be able to increment anymore"); } catch (IndexShardClosedException e) { // expected @@ -300,7 +299,7 @@ public class IndexShardTests extends IndexShardTestCase { IndexShard indexShard = newShard(false); expectThrows(IndexShardNotStartedException.class, () -> indexShard.acquireReplicaOperationPermit(indexShard.getPrimaryTerm() + randomIntBetween(1, 100), - SequenceNumbers.UNASSIGNED_SEQ_NO, null, ThreadPool.Names.INDEX)); + SequenceNumbers.UNASSIGNED_SEQ_NO, null, ThreadPool.Names.INDEX, "")); closeShards(indexShard); } @@ -313,6 +312,7 @@ public class IndexShardTests extends IndexShardTestCase { final CountDownLatch operationLatch = new CountDownLatch(1); final List threads = new ArrayList<>(); for (int i = 0; i < operations; i++) { + final String id = "t_" + i; final Thread thread = new Thread(() -> { try { barrier.await(); @@ -339,7 +339,7 @@ public class IndexShardTests extends IndexShardTestCase { throw new RuntimeException(e); } }, - ThreadPool.Names.INDEX); + ThreadPool.Names.INDEX, id); }); thread.start(); threads.add(thread); @@ -369,6 +369,7 @@ public class IndexShardTests extends IndexShardTestCase { final AtomicLong counter = new AtomicLong(); final List delayedThreads = new ArrayList<>(); for (int i = 0; i < delayedOperations; i++) { + final String id = "d_" + i; final Thread thread = new Thread(() -> { try { delayedOperationsBarrier.await(); @@ -389,7 +390,7 @@ public class IndexShardTests extends IndexShardTestCase { throw new RuntimeException(e); } }, - ThreadPool.Names.INDEX); + ThreadPool.Names.INDEX, id); }); thread.start(); delayedThreads.add(thread); @@ -503,7 +504,7 @@ public class IndexShardTests extends IndexShardTestCase { throw new RuntimeException(e); } }, - ThreadPool.Names.GENERIC); + ThreadPool.Names.GENERIC, ""); latch.await(); assertThat(indexShard.getLocalCheckpoint(), equalTo((long) maxSeqNo)); @@ -547,7 +548,7 @@ public class IndexShardTests extends IndexShardTestCase { throw new RuntimeException(e); } }, - ThreadPool.Names.GENERIC); + ThreadPool.Names.GENERIC, ""); latch.await(); assertThat(indexShard.getTranslog().getGeneration().translogFileGeneration, equalTo(currentTranslogGeneration + 1)); @@ -580,7 +581,7 @@ public class IndexShardTests extends IndexShardTestCase { assertEquals(0, indexShard.getActiveOperationsCount()); if (indexShard.routingEntry().isRelocationTarget() == false) { try { - indexShard.acquireReplicaOperationPermit(primaryTerm, indexShard.getGlobalCheckpoint(), null, ThreadPool.Names.INDEX); + indexShard.acquireReplicaOperationPermit(primaryTerm, indexShard.getGlobalCheckpoint(), null, ThreadPool.Names.INDEX, ""); fail("shard shouldn't accept operations as replica"); } catch (IllegalStateException ignored) { @@ -599,14 +600,14 @@ public class IndexShardTests extends IndexShardTestCase { private Releasable acquirePrimaryOperationPermitBlockingly(IndexShard indexShard) throws ExecutionException, InterruptedException { PlainActionFuture fut = new PlainActionFuture<>(); - indexShard.acquirePrimaryOperationPermit(fut, ThreadPool.Names.INDEX); + indexShard.acquirePrimaryOperationPermit(fut, ThreadPool.Names.INDEX, ""); return fut.get(); } private Releasable acquireReplicaOperationPermitBlockingly(IndexShard indexShard, long opPrimaryTerm) throws ExecutionException, InterruptedException { PlainActionFuture fut = new PlainActionFuture<>(); - indexShard.acquireReplicaOperationPermit(opPrimaryTerm, indexShard.getGlobalCheckpoint(), fut, ThreadPool.Names.INDEX); + indexShard.acquireReplicaOperationPermit(opPrimaryTerm, indexShard.getGlobalCheckpoint(), fut, ThreadPool.Names.INDEX, ""); return fut.get(); } @@ -653,7 +654,8 @@ public class IndexShardTests extends IndexShardTestCase { assertEquals(0, indexShard.getActiveOperationsCount()); if (shardRouting.primary() == false) { final IllegalStateException e = - expectThrows(IllegalStateException.class, () -> indexShard.acquirePrimaryOperationPermit(null, ThreadPool.Names.INDEX)); + expectThrows(IllegalStateException.class, + () -> indexShard.acquirePrimaryOperationPermit(null, ThreadPool.Names.INDEX, "")); assertThat(e, hasToString(containsString("shard " + shardRouting + " is not a primary"))); } @@ -690,7 +692,7 @@ public class IndexShardTests extends IndexShardTestCase { }; indexShard.acquireReplicaOperationPermit(primaryTerm - 1, SequenceNumbers.UNASSIGNED_SEQ_NO, onLockAcquired, - ThreadPool.Names.INDEX); + ThreadPool.Names.INDEX, ""); assertFalse(onResponse.get()); assertTrue(onFailure.get()); @@ -762,7 +764,7 @@ public class IndexShardTests extends IndexShardTestCase { newPrimaryTerm, newGlobalCheckPoint, listener, - ThreadPool.Names.SAME); + ThreadPool.Names.SAME, ""); } catch (Exception e) { listener.onFailure(e); } @@ -900,7 +902,7 @@ public class IndexShardTests extends IndexShardTestCase { } }, - ThreadPool.Names.SAME); + ThreadPool.Names.SAME, ""); latch.await(); @@ -954,7 +956,7 @@ public class IndexShardTests extends IndexShardTestCase { } }, - ThreadPool.Names.SAME); + ThreadPool.Names.SAME, ""); latch.await(); if (globalCheckpointOnReplica == SequenceNumbers.UNASSIGNED_SEQ_NO @@ -1006,7 +1008,7 @@ public class IndexShardTests extends IndexShardTestCase { latch.countDown(); } }, - ThreadPool.Names.INDEX); + ThreadPool.Names.INDEX, ""); }; final long firstIncrement = 1 + (randomBoolean() ? 0 : 1); @@ -1367,7 +1369,7 @@ public class IndexShardTests extends IndexShardTestCase { super.onResponse(releasable); } }; - shard.acquirePrimaryOperationPermit(onLockAcquired, ThreadPool.Names.INDEX); + shard.acquirePrimaryOperationPermit(onLockAcquired, ThreadPool.Names.INDEX, "i_" + i); onLockAcquiredActions.add(onLockAcquired); } diff --git a/server/src/test/java/org/elasticsearch/indices/flush/SyncedFlushSingleNodeTests.java b/server/src/test/java/org/elasticsearch/indices/flush/SyncedFlushSingleNodeTests.java index 3d7ef82ea12..6561001ad7d 100644 --- a/server/src/test/java/org/elasticsearch/indices/flush/SyncedFlushSingleNodeTests.java +++ b/server/src/test/java/org/elasticsearch/indices/flush/SyncedFlushSingleNodeTests.java @@ -27,7 +27,6 @@ import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.IndexService; -import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardNotFoundException; @@ -38,7 +37,6 @@ import org.elasticsearch.threadpool.ThreadPool; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; -import java.util.stream.Collectors; public class SyncedFlushSingleNodeTests extends ESSingleNodeTestCase { @@ -115,7 +113,7 @@ public class SyncedFlushSingleNodeTests extends ESSingleNodeTestCase { SyncedFlushService flushService = getInstanceFromNode(SyncedFlushService.class); final ShardId shardId = shard.shardId(); PlainActionFuture fut = new PlainActionFuture<>(); - shard.acquirePrimaryOperationPermit(fut, ThreadPool.Names.INDEX); + shard.acquirePrimaryOperationPermit(fut, ThreadPool.Names.INDEX, ""); try (Releasable operationLock = fut.get()) { SyncedFlushUtil.LatchedListener listener = new SyncedFlushUtil.LatchedListener<>(); flushService.attemptSyncedFlush(shardId, listener); diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index 7ab6925ce57..4312757f03c 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -88,6 +88,7 @@ import static java.util.Collections.emptySet; import static org.hamcrest.Matchers.equalTo; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; @@ -400,7 +401,7 @@ public class RecoverySourceHandlerTests extends ESTestCase { doAnswer(invocation -> { ((ActionListener)invocation.getArguments()[0]).onResponse(() -> {}); return null; - }).when(shard).acquirePrimaryOperationPermit(any(), anyString()); + }).when(shard).acquirePrimaryOperationPermit(any(), anyString(), anyObject()); final AtomicBoolean phase1Called = new AtomicBoolean(); final AtomicBoolean prepareTargetForTranslogCalled = new AtomicBoolean(); final AtomicBoolean phase2Called = new AtomicBoolean(); diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 29b0abc5158..5d9cc7e1d32 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -28,11 +28,10 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.util.IOUtils; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; -import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags; import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags.Flag; -import org.elasticsearch.action.support.replication.ReplicationTask; import org.elasticsearch.client.Client; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.cluster.ClusterName; @@ -66,8 +65,6 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.ThreadContext; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.discovery.zen.ElectMasterService; import org.elasticsearch.discovery.zen.ZenDiscovery; @@ -93,8 +90,6 @@ import org.elasticsearch.node.NodeValidationException; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.SearchService; -import org.elasticsearch.tasks.TaskInfo; -import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.test.disruption.ServiceDisruptionScheme; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.transport.MockTransportClient; @@ -1106,7 +1101,7 @@ public final class InternalTestCluster extends TestCluster { // test that have ongoing write operations after the test (for example because ttl is used // and not all docs have been purged after the test) and inherit from // ElasticsearchIntegrationTest must override beforeIndexDeletion() to avoid failures. - assertShardIndexCounter(); + assertNoPendingIndexOperations(); //check that shards that have same sync id also contain same number of documents assertSameSyncIdSameDocs(); assertOpenTranslogReferences(); @@ -1136,30 +1131,19 @@ public final class InternalTestCluster extends TestCluster { } } - private void assertShardIndexCounter() throws Exception { + private void assertNoPendingIndexOperations() throws Exception { assertBusy(() -> { final Collection nodesAndClients = nodes.values(); for (NodeAndClient nodeAndClient : nodesAndClients) { IndicesService indexServices = getInstance(IndicesService.class, nodeAndClient.name); for (IndexService indexService : indexServices) { for (IndexShard indexShard : indexService) { - int activeOperationsCount = indexShard.getActiveOperationsCount(); - if (activeOperationsCount > 0) { - TaskManager taskManager = getInstance(TransportService.class, nodeAndClient.name).getTaskManager(); - DiscoveryNode localNode = getInstance(ClusterService.class, nodeAndClient.name).localNode(); - List taskInfos = taskManager.getTasks().values().stream() - .filter(task -> task instanceof ReplicationTask) - .map(task -> task.taskInfo(localNode.getId(), true)) - .collect(Collectors.toList()); - ListTasksResponse response = new ListTasksResponse(taskInfos, Collections.emptyList(), Collections.emptyList()); - try { - XContentBuilder builder = XContentFactory.jsonBuilder().prettyPrint().value(response); - throw new AssertionError("expected index shard counter on shard " + indexShard.shardId() + " on node " + - nodeAndClient.name + " to be 0 but was " + activeOperationsCount + ". Current replication tasks on node:\n" + - builder.string()); - } catch (IOException e) { - throw new RuntimeException("caught exception while building response [" + response + "]", e); - } + List operations = indexShard.getActiveOperations(); + if (operations.size() > 0) { + throw new AssertionError( + "shard " + indexShard.shardId() + " on node [" + nodeAndClient.name + "] has pending operations:\n" + + operations.stream().map(e -> "--> " + ExceptionsHelper.stackTrace(e)).collect(Collectors.joining("\n")) + ); } } }