Capture stack traces while issuing IndexShard operations permits to easy debugging (#28567)
Today we acquire a permit from the shard to coordinate between indexing operations, recoveries and other state transitions. When we leak an permit it's practically impossible to find who the culprit is. This PR add stack traces capturing for each permit so we can identify which part of the code is responsible for acquiring the unreleased permit. This code is only active when assertions are active. The output is something like: ``` java.lang.AssertionError: shard [test][1] on node [node_s0] has pending operations: --> java.lang.RuntimeException: something helpful 2 at org.elasticsearch.index.shard.IndexShardOperationPermits.acquire(IndexShardOperationPermits.java:223) at org.elasticsearch.index.shard.IndexShard.<init>(IndexShard.java:322) at org.elasticsearch.index.IndexService.createShard(IndexService.java:382) at org.elasticsearch.indices.IndicesService.createShard(IndicesService.java:514) at org.elasticsearch.indices.IndicesService.createShard(IndicesService.java:143) at org.elasticsearch.indices.cluster.IndicesClusterStateService.createShard(IndicesClusterStateService.java:552) at org.elasticsearch.indices.cluster.IndicesClusterStateService.createOrUpdateShards(IndicesClusterStateService.java:529) at org.elasticsearch.indices.cluster.IndicesClusterStateService.applyClusterState(IndicesClusterStateService.java:231) at org.elasticsearch.cluster.service.ClusterApplierService.lambda$callClusterStateAppliers$6(ClusterApplierService.java:498) at java.base/java.lang.Iterable.forEach(Iterable.java:75) at org.elasticsearch.cluster.service.ClusterApplierService.callClusterStateAppliers(ClusterApplierService.java:495) at org.elasticsearch.cluster.service.ClusterApplierService.applyChanges(ClusterApplierService.java:482) at org.elasticsearch.cluster.service.ClusterApplierService.runTask(ClusterApplierService.java:432) at org.elasticsearch.cluster.service.ClusterApplierService$UpdateTask.run(ClusterApplierService.java:161) at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:566) at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.runAndClean(PrioritizedEsThreadPoolExecutor.java:244) at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.run(PrioritizedEsThreadPoolExecutor.java:207) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1167) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641) at java.base/java.lang.Thread.run(Thread.java:844) --> java.lang.RuntimeException: something helpful at org.elasticsearch.index.shard.IndexShardOperationPermits.acquire(IndexShardOperationPermits.java:223) at org.elasticsearch.index.shard.IndexShard.<init>(IndexShard.java:311) at org.elasticsearch.index.IndexService.createShard(IndexService.java:382) at org.elasticsearch.indices.IndicesService.createShard(IndicesService.java:514) at org.elasticsearch.indices.IndicesService.createShard(IndicesService.java:143) at org.elasticsearch.indices.cluster.IndicesClusterStateService.createShard(IndicesClusterStateService.java:552) at org.elasticsearch.indices.cluster.IndicesClusterStateService.createOrUpdateShards(IndicesClusterStateService.java:529) at org.elasticsearch.indices.cluster.IndicesClusterStateService.applyClusterState(IndicesClusterStateService.java:231) at org.elasticsearch.cluster.service.ClusterApplierService.lambda$callClusterStateAppliers$6(ClusterApplierService.java:498) at java.base/java.lang.Iterable.forEach(Iterable.java:75) at org.elasticsearch.cluster.service.ClusterApplierService.callClusterStateAppliers(ClusterApplierService.java:495) at org.elasticsearch.cluster.service.ClusterApplierService.applyChanges(ClusterApplierService.java:482) at org.elasticsearch.cluster.service.ClusterApplierService.runTask(ClusterApplierService.java:432) at org.elasticsearch.cluster.service.ClusterApplierService$UpdateTask.run(ClusterApplierService.java:161) at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:566) at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.runAndClean(PrioritizedEsThreadPoolExecutor.java:244) at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.run(PrioritizedEsThreadPoolExecutor.java:207) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1167) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641) at java.base/java.lang.Thread.run(Thread.java:844) ```
This commit is contained in:
parent
5b8870f193
commit
ba59cf1262
|
@ -202,7 +202,7 @@ public abstract class TransportReplicationAction<
|
|||
|
||||
/**
|
||||
* Synchronously execute the specified replica operation. This is done under a permit from
|
||||
* {@link IndexShard#acquireReplicaOperationPermit(long, long, ActionListener, String)}.
|
||||
* {@link IndexShard#acquireReplicaOperationPermit(long, long, ActionListener, String, Object)}.
|
||||
*
|
||||
* @param shardRequest the request to the replica shard
|
||||
* @param replica the replica shard to perform the operation on
|
||||
|
@ -317,7 +317,7 @@ public abstract class TransportReplicationAction<
|
|||
|
||||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
acquirePrimaryShardReference(request.shardId(), targetAllocationID, primaryTerm, this);
|
||||
acquirePrimaryShardReference(request.shardId(), targetAllocationID, primaryTerm, this, request);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -638,7 +638,7 @@ public abstract class TransportReplicationAction<
|
|||
throw new ShardNotFoundException(this.replica.shardId(), "expected aID [{}] but found [{}]", targetAllocationID,
|
||||
actualAllocationId);
|
||||
}
|
||||
replica.acquireReplicaOperationPermit(primaryTerm, globalCheckpoint, this, executor);
|
||||
replica.acquireReplicaOperationPermit(primaryTerm, globalCheckpoint, this, executor, request);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -950,7 +950,7 @@ public abstract class TransportReplicationAction<
|
|||
* and replication of the operation to all replica shards is completed / failed (see {@link ReplicationOperation}).
|
||||
*/
|
||||
private void acquirePrimaryShardReference(ShardId shardId, String allocationId, long primaryTerm,
|
||||
ActionListener<PrimaryShardReference> onReferenceAcquired) {
|
||||
ActionListener<PrimaryShardReference> onReferenceAcquired, Object debugInfo) {
|
||||
IndexShard indexShard = getIndexShard(shardId);
|
||||
// we may end up here if the cluster state used to route the primary is so stale that the underlying
|
||||
// index shard was replaced with a replica. For example - in a two node cluster, if the primary fails
|
||||
|
@ -981,7 +981,7 @@ public abstract class TransportReplicationAction<
|
|||
}
|
||||
};
|
||||
|
||||
indexShard.acquirePrimaryOperationPermit(onAcquired, executor);
|
||||
indexShard.acquirePrimaryOperationPermit(onAcquired, executor, debugInfo);
|
||||
}
|
||||
|
||||
class ShardReference implements Releasable {
|
||||
|
|
|
@ -775,7 +775,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
|
|||
e);
|
||||
}
|
||||
}),
|
||||
ThreadPool.Names.SAME);
|
||||
ThreadPool.Names.SAME, "background global checkpoint sync");
|
||||
} catch (final AlreadyClosedException | IndexShardClosedException e) {
|
||||
// the shard was closed concurrently, continue
|
||||
}
|
||||
|
|
|
@ -2192,19 +2192,23 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
* Acquire a primary operation permit whenever the shard is ready for indexing. If a permit is directly available, the provided
|
||||
* ActionListener will be called on the calling thread. During relocation hand-off, permit acquisition can be delayed. The provided
|
||||
* ActionListener will then be called using the provided executor.
|
||||
*
|
||||
* @param debugInfo an extra information that can be useful when tracing an unreleased permit. When assertions are enabled
|
||||
* the tracing will capture the supplied object's {@link Object#toString()} value. Otherwise the object
|
||||
* isn't used
|
||||
*/
|
||||
public void acquirePrimaryOperationPermit(ActionListener<Releasable> onPermitAcquired, String executorOnDelay) {
|
||||
public void acquirePrimaryOperationPermit(ActionListener<Releasable> onPermitAcquired, String executorOnDelay, Object debugInfo) {
|
||||
verifyNotClosed();
|
||||
verifyPrimary();
|
||||
|
||||
indexShardOperationPermits.acquire(onPermitAcquired, executorOnDelay, false);
|
||||
indexShardOperationPermits.acquire(onPermitAcquired, executorOnDelay, false, debugInfo);
|
||||
}
|
||||
|
||||
private final Object primaryTermMutex = new Object();
|
||||
|
||||
/**
|
||||
* Acquire a replica operation permit whenever the shard is ready for indexing (see
|
||||
* {@link #acquirePrimaryOperationPermit(ActionListener, String)}). If the given primary term is lower than then one in
|
||||
* {@link #acquirePrimaryOperationPermit(ActionListener, String, Object)}). If the given primary term is lower than then one in
|
||||
* {@link #shardRouting}, the {@link ActionListener#onFailure(Exception)} method of the provided listener is invoked with an
|
||||
* {@link IllegalStateException}. If permit acquisition is delayed, the listener will be invoked on the executor with the specified
|
||||
* name.
|
||||
|
@ -2213,9 +2217,13 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
* @param globalCheckpoint the global checkpoint associated with the request
|
||||
* @param onPermitAcquired the listener for permit acquisition
|
||||
* @param executorOnDelay the name of the executor to invoke the listener on if permit acquisition is delayed
|
||||
* @param debugInfo an extra information that can be useful when tracing an unreleased permit. When assertions are enabled
|
||||
* the tracing will capture the supplied object's {@link Object#toString()} value. Otherwise the object
|
||||
* isn't used
|
||||
*/
|
||||
public void acquireReplicaOperationPermit(final long operationPrimaryTerm, final long globalCheckpoint,
|
||||
final ActionListener<Releasable> onPermitAcquired, final String executorOnDelay) {
|
||||
final ActionListener<Releasable> onPermitAcquired, final String executorOnDelay,
|
||||
final Object debugInfo) {
|
||||
verifyNotClosed();
|
||||
verifyReplicationTarget();
|
||||
final boolean globalCheckpointUpdated;
|
||||
|
@ -2301,13 +2309,21 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
}
|
||||
},
|
||||
executorOnDelay,
|
||||
true);
|
||||
true, debugInfo);
|
||||
}
|
||||
|
||||
public int getActiveOperationsCount() {
|
||||
return indexShardOperationPermits.getActiveOperationsCount(); // refCount is incremented on successful acquire and decremented on close
|
||||
}
|
||||
|
||||
/**
|
||||
* @return a list of containing an exception for each operation permit that wasn't released yet. The stack traces of the exceptions
|
||||
* was captured when the operation acquired the permit and their message contains the debug information supplied at the time.
|
||||
*/
|
||||
public List<Throwable> getActiveOperations() {
|
||||
return indexShardOperationPermits.getActiveOperations();
|
||||
}
|
||||
|
||||
private final AsyncIOProcessor<Translog.Location> translogSyncProcessor = new AsyncIOProcessor<Translog.Location>(logger, 1024) {
|
||||
@Override
|
||||
protected void write(List<Tuple<Translog.Location, Consumer<Exception>>> candidates) throws IOException {
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
|
||||
package org.elasticsearch.index.shard;
|
||||
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.elasticsearch.Assertions;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
|
@ -33,6 +32,8 @@ import org.elasticsearch.threadpool.ThreadPool;
|
|||
import java.io.Closeable;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
@ -53,10 +54,14 @@ final class IndexShardOperationPermits implements Closeable {
|
|||
|
||||
static final int TOTAL_PERMITS = Integer.MAX_VALUE;
|
||||
final Semaphore semaphore = new Semaphore(TOTAL_PERMITS, true); // fair to ensure a blocking thread is not starved
|
||||
private final List<ActionListener<Releasable>> delayedOperations = new ArrayList<>(); // operations that are delayed
|
||||
private final List<DelayedOperation> delayedOperations = new ArrayList<>(); // operations that are delayed
|
||||
private volatile boolean closed;
|
||||
private boolean delayed; // does not need to be volatile as all accesses are done under a lock on this
|
||||
|
||||
// only valid when assertions are enabled. Key is AtomicBoolean associated with each permit to ensure close once semantics. Value is an
|
||||
// exception with some extra info in the message + a stack trace of the acquirer
|
||||
private final Map<AtomicBoolean, Throwable> issuedPermits;
|
||||
|
||||
/**
|
||||
* Construct operation permits for the specified shards.
|
||||
*
|
||||
|
@ -66,6 +71,11 @@ final class IndexShardOperationPermits implements Closeable {
|
|||
IndexShardOperationPermits(final ShardId shardId, final ThreadPool threadPool) {
|
||||
this.shardId = shardId;
|
||||
this.threadPool = threadPool;
|
||||
if (Assertions.ENABLED) {
|
||||
issuedPermits = new ConcurrentHashMap<>();
|
||||
} else {
|
||||
issuedPermits = null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -167,7 +177,7 @@ final class IndexShardOperationPermits implements Closeable {
|
|||
}
|
||||
|
||||
private void releaseDelayedOperations() {
|
||||
final List<ActionListener<Releasable>> queuedActions;
|
||||
final List<DelayedOperation> queuedActions;
|
||||
synchronized (this) {
|
||||
assert delayed;
|
||||
queuedActions = new ArrayList<>(delayedOperations);
|
||||
|
@ -185,8 +195,8 @@ final class IndexShardOperationPermits implements Closeable {
|
|||
* recovery
|
||||
*/
|
||||
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
|
||||
for (ActionListener<Releasable> queuedAction : queuedActions) {
|
||||
acquire(queuedAction, null, false);
|
||||
for (DelayedOperation queuedAction : queuedActions) {
|
||||
acquire(queuedAction.listener, null, false, queuedAction.debugInfo);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -204,8 +214,24 @@ final class IndexShardOperationPermits implements Closeable {
|
|||
* @param onAcquired {@link ActionListener} that is invoked once acquisition is successful or failed
|
||||
* @param executorOnDelay executor to use for the possibly delayed {@link ActionListener#onResponse(Object)} call
|
||||
* @param forceExecution whether the runnable should force its execution in case it gets rejected
|
||||
* @param debugInfo an extra information that can be useful when tracing an unreleased permit. When assertions are enabled
|
||||
* the tracing will capture the supplied object's {@link Object#toString()} value. Otherwise the object
|
||||
* isn't used
|
||||
*
|
||||
*/
|
||||
public void acquire(final ActionListener<Releasable> onAcquired, final String executorOnDelay, final boolean forceExecution) {
|
||||
public void acquire(final ActionListener<Releasable> onAcquired, final String executorOnDelay, final boolean forceExecution,
|
||||
final Object debugInfo) {
|
||||
final Throwable debugInfoWithStackTrace;
|
||||
if (Assertions.ENABLED) {
|
||||
debugInfoWithStackTrace = new Throwable(debugInfo.toString());
|
||||
} else {
|
||||
debugInfoWithStackTrace = null;
|
||||
}
|
||||
acquire(onAcquired, executorOnDelay, forceExecution, debugInfoWithStackTrace);
|
||||
}
|
||||
|
||||
private void acquire(final ActionListener<Releasable> onAcquired, final String executorOnDelay, final boolean forceExecution,
|
||||
final Throwable debugInfo) {
|
||||
if (closed) {
|
||||
onAcquired.onFailure(new IndexShardClosedException(shardId));
|
||||
return;
|
||||
|
@ -215,16 +241,18 @@ final class IndexShardOperationPermits implements Closeable {
|
|||
synchronized (this) {
|
||||
if (delayed) {
|
||||
final Supplier<StoredContext> contextSupplier = threadPool.getThreadContext().newRestorableContext(false);
|
||||
final ActionListener<Releasable> wrappedListener;
|
||||
if (executorOnDelay != null) {
|
||||
delayedOperations.add(
|
||||
new PermitAwareThreadedActionListener(threadPool, executorOnDelay,
|
||||
new ContextPreservingActionListener<>(contextSupplier, onAcquired), forceExecution));
|
||||
wrappedListener =
|
||||
new PermitAwareThreadedActionListener(threadPool, executorOnDelay,
|
||||
new ContextPreservingActionListener<>(contextSupplier, onAcquired), forceExecution);
|
||||
} else {
|
||||
delayedOperations.add(new ContextPreservingActionListener<>(contextSupplier, onAcquired));
|
||||
wrappedListener = new ContextPreservingActionListener<>(contextSupplier, onAcquired);
|
||||
}
|
||||
delayedOperations.add(new DelayedOperation(wrappedListener, debugInfo));
|
||||
return;
|
||||
} else {
|
||||
releasable = acquire();
|
||||
releasable = acquire(debugInfo);
|
||||
}
|
||||
}
|
||||
} catch (final InterruptedException e) {
|
||||
|
@ -235,15 +263,23 @@ final class IndexShardOperationPermits implements Closeable {
|
|||
onAcquired.onResponse(releasable);
|
||||
}
|
||||
|
||||
private Releasable acquire() throws InterruptedException {
|
||||
private Releasable acquire(Throwable debugInfo) throws InterruptedException {
|
||||
assert Thread.holdsLock(this);
|
||||
if (semaphore.tryAcquire(1, 0, TimeUnit.SECONDS)) { // the un-timed tryAcquire methods do not honor the fairness setting
|
||||
final AtomicBoolean closed = new AtomicBoolean();
|
||||
return () -> {
|
||||
final Releasable releasable = () -> {
|
||||
if (closed.compareAndSet(false, true)) {
|
||||
if (Assertions.ENABLED) {
|
||||
Throwable e = issuedPermits.remove(closed);
|
||||
assert e != null;
|
||||
}
|
||||
semaphore.release(1);
|
||||
}
|
||||
};
|
||||
if (Assertions.ENABLED) {
|
||||
issuedPermits.put(closed, debugInfo);
|
||||
}
|
||||
return releasable;
|
||||
} else {
|
||||
// this should never happen, if it does something is deeply wrong
|
||||
throw new IllegalStateException("failed to obtain permit but operations are not delayed");
|
||||
|
@ -269,6 +305,28 @@ final class IndexShardOperationPermits implements Closeable {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return a list of containing an exception for each permit that wasn't released yet. The stack traces of the exceptions
|
||||
* was captured when the operation acquired the permit and their message contains the debug information supplied at the time.
|
||||
*/
|
||||
List<Throwable> getActiveOperations() {
|
||||
return new ArrayList<>(issuedPermits.values());
|
||||
}
|
||||
|
||||
private static class DelayedOperation {
|
||||
private final ActionListener<Releasable> listener;
|
||||
private final Throwable debugInfo;
|
||||
|
||||
private DelayedOperation(ActionListener<Releasable> listener, Throwable debugInfo) {
|
||||
this.listener = listener;
|
||||
if (Assertions.ENABLED) {
|
||||
this.debugInfo = new Throwable("delayed", debugInfo);
|
||||
} else {
|
||||
this.debugInfo = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A permit-aware action listener wrapper that spawns onResponse listener invocations off on a configurable thread-pool.
|
||||
* Being permit-aware, it also releases the permit when hitting thread-pool rejections and falls back to the
|
||||
|
|
|
@ -142,7 +142,7 @@ public class RecoverySourceHandler {
|
|||
throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node");
|
||||
}
|
||||
assert targetShardRouting.initializing() : "expected recovery target to be initializing but was " + targetShardRouting;
|
||||
});
|
||||
}, shardId + " validating recovery target ["+ request.targetAllocationId() + "] registered ");
|
||||
|
||||
try (Closeable ignored = shard.acquireTranslogRetentionLock()) {
|
||||
|
||||
|
@ -198,7 +198,8 @@ public class RecoverySourceHandler {
|
|||
* make sure to do this before sampling the max sequence number in the next step, to ensure that we send
|
||||
* all documents up to maxSeqNo in phase2.
|
||||
*/
|
||||
runUnderPrimaryPermit(() -> shard.initiateTracking(request.targetAllocationId()));
|
||||
runUnderPrimaryPermit(() -> shard.initiateTracking(request.targetAllocationId()),
|
||||
shardId + " initiating tracking of " + request.targetAllocationId());
|
||||
|
||||
final long endingSeqNo = shard.seqNoStats().getMaxSeqNo();
|
||||
/*
|
||||
|
@ -229,10 +230,10 @@ public class RecoverySourceHandler {
|
|||
return targetHistoryUUID != null && targetHistoryUUID.equals(shard.getHistoryUUID());
|
||||
}
|
||||
|
||||
private void runUnderPrimaryPermit(CancellableThreads.Interruptable runnable) {
|
||||
private void runUnderPrimaryPermit(CancellableThreads.Interruptable runnable, String reason) {
|
||||
cancellableThreads.execute(() -> {
|
||||
final PlainActionFuture<Releasable> onAcquired = new PlainActionFuture<>();
|
||||
shard.acquirePrimaryOperationPermit(onAcquired, ThreadPool.Names.SAME);
|
||||
shard.acquirePrimaryOperationPermit(onAcquired, ThreadPool.Names.SAME, reason);
|
||||
try (Releasable ignored = onAcquired.actionGet()) {
|
||||
// check that the IndexShard still has the primary authority. This needs to be checked under operation permit to prevent
|
||||
// races, as IndexShard will change to RELOCATED only when it holds all operation permits, see IndexShard.relocated()
|
||||
|
@ -493,10 +494,12 @@ public class RecoverySourceHandler {
|
|||
* marking the shard as in-sync. If the relocation handoff holds all the permits then after the handoff completes and we acquire
|
||||
* the permit then the state of the shard will be relocated and this recovery will fail.
|
||||
*/
|
||||
runUnderPrimaryPermit(() -> shard.markAllocationIdAsInSync(request.targetAllocationId(), targetLocalCheckpoint));
|
||||
runUnderPrimaryPermit(() -> shard.markAllocationIdAsInSync(request.targetAllocationId(), targetLocalCheckpoint),
|
||||
shardId + " marking " + request.targetAllocationId() + " as in sync");
|
||||
final long globalCheckpoint = shard.getGlobalCheckpoint();
|
||||
cancellableThreads.executeIO(() -> recoveryTarget.finalizeRecovery(globalCheckpoint));
|
||||
runUnderPrimaryPermit(() -> shard.updateGlobalCheckpointForShard(request.targetAllocationId(), globalCheckpoint));
|
||||
runUnderPrimaryPermit(() -> shard.updateGlobalCheckpointForShard(request.targetAllocationId(), globalCheckpoint),
|
||||
shardId + " updating " + request.targetAllocationId() + "'s global checkpoint");
|
||||
|
||||
if (request.isPrimaryRelocation()) {
|
||||
logger.trace("performing relocation hand-off");
|
||||
|
|
|
@ -117,6 +117,7 @@ import static org.hamcrest.Matchers.nullValue;
|
|||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyInt;
|
||||
import static org.mockito.Matchers.anyLong;
|
||||
import static org.mockito.Matchers.anyObject;
|
||||
import static org.mockito.Matchers.anyString;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.doThrow;
|
||||
|
@ -694,7 +695,7 @@ public class TransportReplicationActionTests extends ESTestCase {
|
|||
doAnswer(invocation -> {
|
||||
((ActionListener<Releasable>)invocation.getArguments()[0]).onResponse(() -> {});
|
||||
return null;
|
||||
}).when(shard).acquirePrimaryOperationPermit(any(), anyString());
|
||||
}).when(shard).acquirePrimaryOperationPermit(any(), anyString(), anyObject());
|
||||
|
||||
AtomicBoolean closed = new AtomicBoolean();
|
||||
Releasable releasable = () -> {
|
||||
|
@ -1194,7 +1195,7 @@ public class TransportReplicationActionTests extends ESTestCase {
|
|||
count.incrementAndGet();
|
||||
callback.onResponse(count::decrementAndGet);
|
||||
return null;
|
||||
}).when(indexShard).acquirePrimaryOperationPermit(any(ActionListener.class), anyString());
|
||||
}).when(indexShard).acquirePrimaryOperationPermit(any(ActionListener.class), anyString(), anyObject());
|
||||
doAnswer(invocation -> {
|
||||
long term = (Long)invocation.getArguments()[0];
|
||||
ActionListener<Releasable> callback = (ActionListener<Releasable>) invocation.getArguments()[2];
|
||||
|
@ -1206,7 +1207,7 @@ public class TransportReplicationActionTests extends ESTestCase {
|
|||
count.incrementAndGet();
|
||||
callback.onResponse(count::decrementAndGet);
|
||||
return null;
|
||||
}).when(indexShard).acquireReplicaOperationPermit(anyLong(), anyLong(), any(ActionListener.class), anyString());
|
||||
}).when(indexShard).acquireReplicaOperationPermit(anyLong(), anyLong(), any(ActionListener.class), anyString(), anyObject());
|
||||
when(indexShard.routingEntry()).thenAnswer(invocationOnMock -> {
|
||||
final ClusterState state = clusterService.state();
|
||||
final RoutingNode node = state.getRoutingNodes().node(state.nodes().getLocalNodeId());
|
||||
|
|
|
@ -80,6 +80,7 @@ import static org.hamcrest.Matchers.instanceOf;
|
|||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyInt;
|
||||
import static org.mockito.Matchers.anyLong;
|
||||
import static org.mockito.Matchers.anyObject;
|
||||
import static org.mockito.Matchers.anyString;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.doThrow;
|
||||
|
@ -449,7 +450,7 @@ public class TransportWriteActionTests extends ESTestCase {
|
|||
count.incrementAndGet();
|
||||
callback.onResponse(count::decrementAndGet);
|
||||
return null;
|
||||
}).when(indexShard).acquirePrimaryOperationPermit(any(ActionListener.class), anyString());
|
||||
}).when(indexShard).acquirePrimaryOperationPermit(any(ActionListener.class), anyString(), anyObject());
|
||||
doAnswer(invocation -> {
|
||||
long term = (Long)invocation.getArguments()[0];
|
||||
ActionListener<Releasable> callback = (ActionListener<Releasable>) invocation.getArguments()[1];
|
||||
|
@ -461,7 +462,7 @@ public class TransportWriteActionTests extends ESTestCase {
|
|||
count.incrementAndGet();
|
||||
callback.onResponse(count::decrementAndGet);
|
||||
return null;
|
||||
}).when(indexShard).acquireReplicaOperationPermit(anyLong(), anyLong(), any(ActionListener.class), anyString());
|
||||
}).when(indexShard).acquireReplicaOperationPermit(anyLong(), anyLong(), any(ActionListener.class), anyString(), anyObject());
|
||||
when(indexShard.routingEntry()).thenAnswer(invocationOnMock -> {
|
||||
final ClusterState state = clusterService.state();
|
||||
final RoutingNode node = state.getRoutingNodes().node(state.nodes().getLocalNodeId());
|
||||
|
|
|
@ -535,7 +535,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
|
|||
listener.onFailure(e);
|
||||
}
|
||||
},
|
||||
ThreadPool.Names.INDEX);
|
||||
ThreadPool.Names.INDEX, request);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -35,6 +35,7 @@ import org.junit.Before;
|
|||
import org.junit.BeforeClass;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
@ -48,10 +49,15 @@ import java.util.concurrent.TimeoutException;
|
|||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.hamcrest.Matchers.contains;
|
||||
import static org.hamcrest.Matchers.containsInAnyOrder;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.either;
|
||||
import static org.hamcrest.Matchers.emptyIterable;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
import static org.hamcrest.Matchers.hasToString;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
|
||||
|
@ -122,7 +128,7 @@ public class IndexShardOperationPermitsTests extends ESTestCase {
|
|||
public void run() {
|
||||
latch.countDown();
|
||||
try {
|
||||
permits.acquire(future, threadPoolName, forceExecution);
|
||||
permits.acquire(future, threadPoolName, forceExecution, "");
|
||||
} catch (DummyException dummyException) {
|
||||
// ok, notify future
|
||||
assertTrue(failingListener);
|
||||
|
@ -176,7 +182,7 @@ public class IndexShardOperationPermitsTests extends ESTestCase {
|
|||
|
||||
public void testOperationsInvokedImmediatelyIfNoBlock() throws ExecutionException, InterruptedException {
|
||||
PlainActionFuture<Releasable> future = new PlainActionFuture<>();
|
||||
permits.acquire(future, ThreadPool.Names.GENERIC, true);
|
||||
permits.acquire(future, ThreadPool.Names.GENERIC, true, "");
|
||||
assertTrue(future.isDone());
|
||||
future.get().close();
|
||||
}
|
||||
|
@ -184,7 +190,7 @@ public class IndexShardOperationPermitsTests extends ESTestCase {
|
|||
public void testOperationsIfClosed() throws ExecutionException, InterruptedException {
|
||||
PlainActionFuture<Releasable> future = new PlainActionFuture<>();
|
||||
permits.close();
|
||||
permits.acquire(future, ThreadPool.Names.GENERIC, true);
|
||||
permits.acquire(future, ThreadPool.Names.GENERIC, true, "");
|
||||
ExecutionException exception = expectThrows(ExecutionException.class, future::get);
|
||||
assertThat(exception.getCause(), instanceOf(IndexShardClosedException.class));
|
||||
}
|
||||
|
@ -198,7 +204,7 @@ public class IndexShardOperationPermitsTests extends ESTestCase {
|
|||
public void testOperationsDelayedIfBlock() throws ExecutionException, InterruptedException, TimeoutException {
|
||||
PlainActionFuture<Releasable> future = new PlainActionFuture<>();
|
||||
try (Releasable ignored = blockAndWait()) {
|
||||
permits.acquire(future, ThreadPool.Names.GENERIC, true);
|
||||
permits.acquire(future, ThreadPool.Names.GENERIC, true, "");
|
||||
assertFalse(future.isDone());
|
||||
}
|
||||
future.get(1, TimeUnit.HOURS).close();
|
||||
|
@ -245,8 +251,8 @@ public class IndexShardOperationPermitsTests extends ESTestCase {
|
|||
context.putHeader("foo", "bar");
|
||||
context.putTransient("bar", "baz");
|
||||
// test both with and without a executor name
|
||||
permits.acquire(future, ThreadPool.Names.GENERIC, true);
|
||||
permits.acquire(future2, null, true);
|
||||
permits.acquire(future, ThreadPool.Names.GENERIC, true, "");
|
||||
permits.acquire(future2, null, true, "");
|
||||
}
|
||||
assertFalse(future.isDone());
|
||||
}
|
||||
|
@ -329,7 +335,7 @@ public class IndexShardOperationPermitsTests extends ESTestCase {
|
|||
}
|
||||
},
|
||||
ThreadPool.Names.GENERIC,
|
||||
false));
|
||||
false, ""));
|
||||
thread.start();
|
||||
assertFalse(delayed.get());
|
||||
releaseBlock.countDown();
|
||||
|
@ -387,7 +393,7 @@ public class IndexShardOperationPermitsTests extends ESTestCase {
|
|||
}
|
||||
},
|
||||
ThreadPool.Names.GENERIC,
|
||||
false);
|
||||
false, "");
|
||||
});
|
||||
secondOperationThread.start();
|
||||
|
||||
|
@ -436,7 +442,7 @@ public class IndexShardOperationPermitsTests extends ESTestCase {
|
|||
}
|
||||
},
|
||||
ThreadPool.Names.GENERIC,
|
||||
false);
|
||||
false, "");
|
||||
});
|
||||
thread.start();
|
||||
threads.add(thread);
|
||||
|
@ -490,12 +496,12 @@ public class IndexShardOperationPermitsTests extends ESTestCase {
|
|||
|
||||
public void testActiveOperationsCount() throws ExecutionException, InterruptedException {
|
||||
PlainActionFuture<Releasable> future1 = new PlainActionFuture<>();
|
||||
permits.acquire(future1, ThreadPool.Names.GENERIC, true);
|
||||
permits.acquire(future1, ThreadPool.Names.GENERIC, true, "");
|
||||
assertTrue(future1.isDone());
|
||||
assertThat(permits.getActiveOperationsCount(), equalTo(1));
|
||||
|
||||
PlainActionFuture<Releasable> future2 = new PlainActionFuture<>();
|
||||
permits.acquire(future2, ThreadPool.Names.GENERIC, true);
|
||||
permits.acquire(future2, ThreadPool.Names.GENERIC, true, "");
|
||||
assertTrue(future2.isDone());
|
||||
assertThat(permits.getActiveOperationsCount(), equalTo(2));
|
||||
|
||||
|
@ -511,7 +517,7 @@ public class IndexShardOperationPermitsTests extends ESTestCase {
|
|||
}
|
||||
|
||||
PlainActionFuture<Releasable> future3 = new PlainActionFuture<>();
|
||||
permits.acquire(future3, ThreadPool.Names.GENERIC, true);
|
||||
permits.acquire(future3, ThreadPool.Names.GENERIC, true, "");
|
||||
assertTrue(future3.isDone());
|
||||
assertThat(permits.getActiveOperationsCount(), equalTo(1));
|
||||
future3.get().close();
|
||||
|
@ -594,7 +600,7 @@ public class IndexShardOperationPermitsTests extends ESTestCase {
|
|||
}
|
||||
},
|
||||
ThreadPool.Names.GENERIC,
|
||||
false));
|
||||
false, ""));
|
||||
assertThat(e, hasToString(containsString("failed to obtain permit but operations are not delayed")));
|
||||
permits.semaphore.release(IndexShardOperationPermits.TOTAL_PERMITS);
|
||||
}
|
||||
|
@ -645,8 +651,37 @@ public class IndexShardOperationPermitsTests extends ESTestCase {
|
|||
}
|
||||
},
|
||||
ThreadPool.Names.GENERIC,
|
||||
false);
|
||||
false, "");
|
||||
};
|
||||
}
|
||||
|
||||
public void testPermitTraceCapturing() throws ExecutionException, InterruptedException {
|
||||
final PlainActionFuture<Releasable> listener1 = new PlainActionFuture<>();
|
||||
permits.acquire(listener1, null, false, "listener1");
|
||||
final PlainActionFuture<Releasable> listener2 = new PlainActionFuture<>();
|
||||
permits.acquire(listener2, null, false, "listener2");
|
||||
|
||||
assertThat(permits.getActiveOperationsCount(), equalTo(2));
|
||||
List<String> messages = permits.getActiveOperations().stream().map(Throwable::getMessage).collect(Collectors.toList());
|
||||
assertThat(messages, hasSize(2));
|
||||
assertThat(messages, containsInAnyOrder(Arrays.asList(containsString("listener1"), containsString("listener2"))));
|
||||
|
||||
if (randomBoolean()) {
|
||||
listener1.get().close();
|
||||
assertThat(permits.getActiveOperationsCount(), equalTo(1));
|
||||
messages = permits.getActiveOperations().stream().map(Throwable::getMessage).collect(Collectors.toList());
|
||||
assertThat(messages, hasSize(1));
|
||||
assertThat(messages, contains(containsString("listener2")));
|
||||
listener2.get().close();
|
||||
} else {
|
||||
listener2.get().close();
|
||||
assertThat(permits.getActiveOperationsCount(), equalTo(1));
|
||||
messages = permits.getActiveOperations().stream().map(Throwable::getMessage).collect(Collectors.toList());
|
||||
assertThat(messages, hasSize(1));
|
||||
assertThat(messages, contains(containsString("listener1")));
|
||||
listener1.get().close();
|
||||
}
|
||||
assertThat(permits.getActiveOperationsCount(), equalTo(0));
|
||||
assertThat(permits.getActiveOperations(), emptyIterable());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -150,7 +150,6 @@ import static org.hamcrest.Matchers.equalTo;
|
|||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
import static org.hamcrest.Matchers.hasKey;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
import static org.hamcrest.Matchers.hasToString;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.hamcrest.Matchers.lessThan;
|
||||
|
@ -282,14 +281,14 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
closeShards(indexShard);
|
||||
assertThat(indexShard.getActiveOperationsCount(), equalTo(0));
|
||||
try {
|
||||
indexShard.acquirePrimaryOperationPermit(null, ThreadPool.Names.INDEX);
|
||||
indexShard.acquirePrimaryOperationPermit(null, ThreadPool.Names.INDEX, "");
|
||||
fail("we should not be able to increment anymore");
|
||||
} catch (IndexShardClosedException e) {
|
||||
// expected
|
||||
}
|
||||
try {
|
||||
indexShard.acquireReplicaOperationPermit(indexShard.getPrimaryTerm(), SequenceNumbers.UNASSIGNED_SEQ_NO, null,
|
||||
ThreadPool.Names.INDEX);
|
||||
ThreadPool.Names.INDEX, "");
|
||||
fail("we should not be able to increment anymore");
|
||||
} catch (IndexShardClosedException e) {
|
||||
// expected
|
||||
|
@ -300,7 +299,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
IndexShard indexShard = newShard(false);
|
||||
expectThrows(IndexShardNotStartedException.class, () ->
|
||||
indexShard.acquireReplicaOperationPermit(indexShard.getPrimaryTerm() + randomIntBetween(1, 100),
|
||||
SequenceNumbers.UNASSIGNED_SEQ_NO, null, ThreadPool.Names.INDEX));
|
||||
SequenceNumbers.UNASSIGNED_SEQ_NO, null, ThreadPool.Names.INDEX, ""));
|
||||
closeShards(indexShard);
|
||||
}
|
||||
|
||||
|
@ -313,6 +312,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
final CountDownLatch operationLatch = new CountDownLatch(1);
|
||||
final List<Thread> threads = new ArrayList<>();
|
||||
for (int i = 0; i < operations; i++) {
|
||||
final String id = "t_" + i;
|
||||
final Thread thread = new Thread(() -> {
|
||||
try {
|
||||
barrier.await();
|
||||
|
@ -339,7 +339,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
throw new RuntimeException(e);
|
||||
}
|
||||
},
|
||||
ThreadPool.Names.INDEX);
|
||||
ThreadPool.Names.INDEX, id);
|
||||
});
|
||||
thread.start();
|
||||
threads.add(thread);
|
||||
|
@ -369,6 +369,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
final AtomicLong counter = new AtomicLong();
|
||||
final List<Thread> delayedThreads = new ArrayList<>();
|
||||
for (int i = 0; i < delayedOperations; i++) {
|
||||
final String id = "d_" + i;
|
||||
final Thread thread = new Thread(() -> {
|
||||
try {
|
||||
delayedOperationsBarrier.await();
|
||||
|
@ -389,7 +390,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
throw new RuntimeException(e);
|
||||
}
|
||||
},
|
||||
ThreadPool.Names.INDEX);
|
||||
ThreadPool.Names.INDEX, id);
|
||||
});
|
||||
thread.start();
|
||||
delayedThreads.add(thread);
|
||||
|
@ -503,7 +504,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
throw new RuntimeException(e);
|
||||
}
|
||||
},
|
||||
ThreadPool.Names.GENERIC);
|
||||
ThreadPool.Names.GENERIC, "");
|
||||
|
||||
latch.await();
|
||||
assertThat(indexShard.getLocalCheckpoint(), equalTo((long) maxSeqNo));
|
||||
|
@ -547,7 +548,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
throw new RuntimeException(e);
|
||||
}
|
||||
},
|
||||
ThreadPool.Names.GENERIC);
|
||||
ThreadPool.Names.GENERIC, "");
|
||||
|
||||
latch.await();
|
||||
assertThat(indexShard.getTranslog().getGeneration().translogFileGeneration, equalTo(currentTranslogGeneration + 1));
|
||||
|
@ -580,7 +581,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
assertEquals(0, indexShard.getActiveOperationsCount());
|
||||
if (indexShard.routingEntry().isRelocationTarget() == false) {
|
||||
try {
|
||||
indexShard.acquireReplicaOperationPermit(primaryTerm, indexShard.getGlobalCheckpoint(), null, ThreadPool.Names.INDEX);
|
||||
indexShard.acquireReplicaOperationPermit(primaryTerm, indexShard.getGlobalCheckpoint(), null, ThreadPool.Names.INDEX, "");
|
||||
fail("shard shouldn't accept operations as replica");
|
||||
} catch (IllegalStateException ignored) {
|
||||
|
||||
|
@ -599,14 +600,14 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
|
||||
private Releasable acquirePrimaryOperationPermitBlockingly(IndexShard indexShard) throws ExecutionException, InterruptedException {
|
||||
PlainActionFuture<Releasable> fut = new PlainActionFuture<>();
|
||||
indexShard.acquirePrimaryOperationPermit(fut, ThreadPool.Names.INDEX);
|
||||
indexShard.acquirePrimaryOperationPermit(fut, ThreadPool.Names.INDEX, "");
|
||||
return fut.get();
|
||||
}
|
||||
|
||||
private Releasable acquireReplicaOperationPermitBlockingly(IndexShard indexShard, long opPrimaryTerm)
|
||||
throws ExecutionException, InterruptedException {
|
||||
PlainActionFuture<Releasable> fut = new PlainActionFuture<>();
|
||||
indexShard.acquireReplicaOperationPermit(opPrimaryTerm, indexShard.getGlobalCheckpoint(), fut, ThreadPool.Names.INDEX);
|
||||
indexShard.acquireReplicaOperationPermit(opPrimaryTerm, indexShard.getGlobalCheckpoint(), fut, ThreadPool.Names.INDEX, "");
|
||||
return fut.get();
|
||||
}
|
||||
|
||||
|
@ -653,7 +654,8 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
assertEquals(0, indexShard.getActiveOperationsCount());
|
||||
if (shardRouting.primary() == false) {
|
||||
final IllegalStateException e =
|
||||
expectThrows(IllegalStateException.class, () -> indexShard.acquirePrimaryOperationPermit(null, ThreadPool.Names.INDEX));
|
||||
expectThrows(IllegalStateException.class,
|
||||
() -> indexShard.acquirePrimaryOperationPermit(null, ThreadPool.Names.INDEX, ""));
|
||||
assertThat(e, hasToString(containsString("shard " + shardRouting + " is not a primary")));
|
||||
}
|
||||
|
||||
|
@ -690,7 +692,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
};
|
||||
|
||||
indexShard.acquireReplicaOperationPermit(primaryTerm - 1, SequenceNumbers.UNASSIGNED_SEQ_NO, onLockAcquired,
|
||||
ThreadPool.Names.INDEX);
|
||||
ThreadPool.Names.INDEX, "");
|
||||
|
||||
assertFalse(onResponse.get());
|
||||
assertTrue(onFailure.get());
|
||||
|
@ -762,7 +764,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
newPrimaryTerm,
|
||||
newGlobalCheckPoint,
|
||||
listener,
|
||||
ThreadPool.Names.SAME);
|
||||
ThreadPool.Names.SAME, "");
|
||||
} catch (Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
|
@ -900,7 +902,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
|
||||
}
|
||||
},
|
||||
ThreadPool.Names.SAME);
|
||||
ThreadPool.Names.SAME, "");
|
||||
|
||||
latch.await();
|
||||
|
||||
|
@ -954,7 +956,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
|
||||
}
|
||||
},
|
||||
ThreadPool.Names.SAME);
|
||||
ThreadPool.Names.SAME, "");
|
||||
|
||||
latch.await();
|
||||
if (globalCheckpointOnReplica == SequenceNumbers.UNASSIGNED_SEQ_NO
|
||||
|
@ -1006,7 +1008,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
latch.countDown();
|
||||
}
|
||||
},
|
||||
ThreadPool.Names.INDEX);
|
||||
ThreadPool.Names.INDEX, "");
|
||||
};
|
||||
|
||||
final long firstIncrement = 1 + (randomBoolean() ? 0 : 1);
|
||||
|
@ -1367,7 +1369,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
super.onResponse(releasable);
|
||||
}
|
||||
};
|
||||
shard.acquirePrimaryOperationPermit(onLockAcquired, ThreadPool.Names.INDEX);
|
||||
shard.acquirePrimaryOperationPermit(onLockAcquired, ThreadPool.Names.INDEX, "i_" + i);
|
||||
onLockAcquiredActions.add(onLockAcquired);
|
||||
}
|
||||
|
||||
|
|
|
@ -27,7 +27,6 @@ import org.elasticsearch.common.UUIDs;
|
|||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.index.IndexService;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.shard.ShardNotFoundException;
|
||||
|
@ -38,7 +37,6 @@ import org.elasticsearch.threadpool.ThreadPool;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class SyncedFlushSingleNodeTests extends ESSingleNodeTestCase {
|
||||
|
||||
|
@ -115,7 +113,7 @@ public class SyncedFlushSingleNodeTests extends ESSingleNodeTestCase {
|
|||
SyncedFlushService flushService = getInstanceFromNode(SyncedFlushService.class);
|
||||
final ShardId shardId = shard.shardId();
|
||||
PlainActionFuture<Releasable> fut = new PlainActionFuture<>();
|
||||
shard.acquirePrimaryOperationPermit(fut, ThreadPool.Names.INDEX);
|
||||
shard.acquirePrimaryOperationPermit(fut, ThreadPool.Names.INDEX, "");
|
||||
try (Releasable operationLock = fut.get()) {
|
||||
SyncedFlushUtil.LatchedListener<ShardsSyncedFlushResult> listener = new SyncedFlushUtil.LatchedListener<>();
|
||||
flushService.attemptSyncedFlush(shardId, listener);
|
||||
|
|
|
@ -88,6 +88,7 @@ import static java.util.Collections.emptySet;
|
|||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyBoolean;
|
||||
import static org.mockito.Matchers.anyObject;
|
||||
import static org.mockito.Matchers.anyString;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
@ -400,7 +401,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
|
|||
doAnswer(invocation -> {
|
||||
((ActionListener<Releasable>)invocation.getArguments()[0]).onResponse(() -> {});
|
||||
return null;
|
||||
}).when(shard).acquirePrimaryOperationPermit(any(), anyString());
|
||||
}).when(shard).acquirePrimaryOperationPermit(any(), anyString(), anyObject());
|
||||
final AtomicBoolean phase1Called = new AtomicBoolean();
|
||||
final AtomicBoolean prepareTargetForTranslogCalled = new AtomicBoolean();
|
||||
final AtomicBoolean phase2Called = new AtomicBoolean();
|
||||
|
|
|
@ -28,11 +28,10 @@ import org.apache.logging.log4j.Logger;
|
|||
import org.apache.lucene.store.AlreadyClosedException;
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
|
||||
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
|
||||
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
|
||||
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags.Flag;
|
||||
import org.elasticsearch.action.support.replication.ReplicationTask;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.client.transport.TransportClient;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
|
@ -66,8 +65,6 @@ import org.elasticsearch.common.unit.TimeValue;
|
|||
import org.elasticsearch.common.util.PageCacheRecycler;
|
||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.discovery.DiscoveryModule;
|
||||
import org.elasticsearch.discovery.zen.ElectMasterService;
|
||||
import org.elasticsearch.discovery.zen.ZenDiscovery;
|
||||
|
@ -93,8 +90,6 @@ import org.elasticsearch.node.NodeValidationException;
|
|||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.script.ScriptService;
|
||||
import org.elasticsearch.search.SearchService;
|
||||
import org.elasticsearch.tasks.TaskInfo;
|
||||
import org.elasticsearch.tasks.TaskManager;
|
||||
import org.elasticsearch.test.disruption.ServiceDisruptionScheme;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
import org.elasticsearch.transport.MockTransportClient;
|
||||
|
@ -1106,7 +1101,7 @@ public final class InternalTestCluster extends TestCluster {
|
|||
// test that have ongoing write operations after the test (for example because ttl is used
|
||||
// and not all docs have been purged after the test) and inherit from
|
||||
// ElasticsearchIntegrationTest must override beforeIndexDeletion() to avoid failures.
|
||||
assertShardIndexCounter();
|
||||
assertNoPendingIndexOperations();
|
||||
//check that shards that have same sync id also contain same number of documents
|
||||
assertSameSyncIdSameDocs();
|
||||
assertOpenTranslogReferences();
|
||||
|
@ -1136,30 +1131,19 @@ public final class InternalTestCluster extends TestCluster {
|
|||
}
|
||||
}
|
||||
|
||||
private void assertShardIndexCounter() throws Exception {
|
||||
private void assertNoPendingIndexOperations() throws Exception {
|
||||
assertBusy(() -> {
|
||||
final Collection<NodeAndClient> nodesAndClients = nodes.values();
|
||||
for (NodeAndClient nodeAndClient : nodesAndClients) {
|
||||
IndicesService indexServices = getInstance(IndicesService.class, nodeAndClient.name);
|
||||
for (IndexService indexService : indexServices) {
|
||||
for (IndexShard indexShard : indexService) {
|
||||
int activeOperationsCount = indexShard.getActiveOperationsCount();
|
||||
if (activeOperationsCount > 0) {
|
||||
TaskManager taskManager = getInstance(TransportService.class, nodeAndClient.name).getTaskManager();
|
||||
DiscoveryNode localNode = getInstance(ClusterService.class, nodeAndClient.name).localNode();
|
||||
List<TaskInfo> taskInfos = taskManager.getTasks().values().stream()
|
||||
.filter(task -> task instanceof ReplicationTask)
|
||||
.map(task -> task.taskInfo(localNode.getId(), true))
|
||||
.collect(Collectors.toList());
|
||||
ListTasksResponse response = new ListTasksResponse(taskInfos, Collections.emptyList(), Collections.emptyList());
|
||||
try {
|
||||
XContentBuilder builder = XContentFactory.jsonBuilder().prettyPrint().value(response);
|
||||
throw new AssertionError("expected index shard counter on shard " + indexShard.shardId() + " on node " +
|
||||
nodeAndClient.name + " to be 0 but was " + activeOperationsCount + ". Current replication tasks on node:\n" +
|
||||
builder.string());
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException("caught exception while building response [" + response + "]", e);
|
||||
}
|
||||
List<Throwable> operations = indexShard.getActiveOperations();
|
||||
if (operations.size() > 0) {
|
||||
throw new AssertionError(
|
||||
"shard " + indexShard.shardId() + " on node [" + nodeAndClient.name + "] has pending operations:\n" +
|
||||
operations.stream().map(e -> "--> " + ExceptionsHelper.stackTrace(e)).collect(Collectors.joining("\n"))
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue