IndexShardOperationPermits: shouldn't use new Throwable to capture stack traces (#28598)
The is a follow up to #28567 changing the method used to capture stack traces, as requested during the review. Instead of creating a throwable, we explicitly capture the stack trace of the current thread. This should Make Jason Happy Again ™️ .
This commit is contained in:
parent
37e938f9de
commit
4aece92b2c
|
@ -30,13 +30,13 @@ import io.netty.util.internal.logging.InternalLoggerFactory;
|
|||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.lucene.util.BytesRefIterator;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.common.Booleans;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.logging.ESLoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.LinkedList;
|
||||
|
@ -45,7 +45,6 @@ import java.util.Locale;
|
|||
import java.util.Optional;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class Netty4Utils {
|
||||
|
||||
|
@ -182,8 +181,7 @@ public class Netty4Utils {
|
|||
*/
|
||||
try {
|
||||
// try to log the current stack trace
|
||||
final StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
|
||||
final String formatted = Arrays.stream(stackTrace).skip(1).map(e -> "\tat " + e).collect(Collectors.joining("\n"));
|
||||
final String formatted = ExceptionsHelper.formatStackTrace(Thread.currentThread().getStackTrace());
|
||||
final Logger logger = ESLoggerFactory.getLogger(Netty4Utils.class);
|
||||
logger.error("fatal error on the network layer\n{}", formatted);
|
||||
} finally {
|
||||
|
|
|
@ -33,9 +33,11 @@ import java.io.IOException;
|
|||
import java.io.PrintWriter;
|
||||
import java.io.StringWriter;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public final class ExceptionsHelper {
|
||||
|
||||
|
@ -122,6 +124,10 @@ public final class ExceptionsHelper {
|
|||
return stackTraceStringWriter.toString();
|
||||
}
|
||||
|
||||
public static String formatStackTrace(final StackTraceElement[] stackTrace) {
|
||||
return Arrays.stream(stackTrace).skip(1).map(e -> "\tat " + e).collect(Collectors.joining("\n"));
|
||||
}
|
||||
|
||||
/**
|
||||
* Rethrows the first exception in the list and adds all remaining to the suppressed list.
|
||||
* If the given list is empty no exception is thrown
|
||||
|
|
|
@ -2317,10 +2317,10 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
}
|
||||
|
||||
/**
|
||||
* @return a list of containing an exception for each operation permit that wasn't released yet. The stack traces of the exceptions
|
||||
* was captured when the operation acquired the permit and their message contains the debug information supplied at the time.
|
||||
* @return a list of describing each permit that wasn't released yet. The description consist of the debugInfo supplied
|
||||
* when the permit was acquired plus a stack traces that was captured when the permit was request.
|
||||
*/
|
||||
public List<Throwable> getActiveOperations() {
|
||||
public List<String> getActiveOperations() {
|
||||
return indexShardOperationPermits.getActiveOperations();
|
||||
}
|
||||
|
||||
|
|
|
@ -21,9 +21,11 @@ package org.elasticsearch.index.shard;
|
|||
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.elasticsearch.Assertions;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.support.ContextPreservingActionListener;
|
||||
import org.elasticsearch.common.CheckedRunnable;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext.StoredContext;
|
||||
|
@ -40,6 +42,7 @@ import java.util.concurrent.TimeoutException;
|
|||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* Tracks shard operation permits. Each operation on the shard obtains a permit. When we need to block operations (e.g., to transition
|
||||
|
@ -58,9 +61,9 @@ final class IndexShardOperationPermits implements Closeable {
|
|||
private volatile boolean closed;
|
||||
private boolean delayed; // does not need to be volatile as all accesses are done under a lock on this
|
||||
|
||||
// only valid when assertions are enabled. Key is AtomicBoolean associated with each permit to ensure close once semantics. Value is an
|
||||
// exception with some extra info in the message + a stack trace of the acquirer
|
||||
private final Map<AtomicBoolean, Throwable> issuedPermits;
|
||||
// only valid when assertions are enabled. Key is AtomicBoolean associated with each permit to ensure close once semantics.
|
||||
// Value is a tuple, with a some debug information supplied by the caller and a stack trace of the acquiring thread
|
||||
private final Map<AtomicBoolean, Tuple<String, StackTraceElement[]>> issuedPermits;
|
||||
|
||||
/**
|
||||
* Construct operation permits for the specified shards.
|
||||
|
@ -196,7 +199,7 @@ final class IndexShardOperationPermits implements Closeable {
|
|||
*/
|
||||
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
|
||||
for (DelayedOperation queuedAction : queuedActions) {
|
||||
acquire(queuedAction.listener, null, false, queuedAction.debugInfo);
|
||||
acquire(queuedAction.listener, null, false, queuedAction.debugInfo, queuedAction.stackTrace);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -221,17 +224,17 @@ final class IndexShardOperationPermits implements Closeable {
|
|||
*/
|
||||
public void acquire(final ActionListener<Releasable> onAcquired, final String executorOnDelay, final boolean forceExecution,
|
||||
final Object debugInfo) {
|
||||
final Throwable debugInfoWithStackTrace;
|
||||
final StackTraceElement[] stackTrace;
|
||||
if (Assertions.ENABLED) {
|
||||
debugInfoWithStackTrace = new Throwable(debugInfo.toString());
|
||||
stackTrace = Thread.currentThread().getStackTrace();
|
||||
} else {
|
||||
debugInfoWithStackTrace = null;
|
||||
stackTrace = null;
|
||||
}
|
||||
acquire(onAcquired, executorOnDelay, forceExecution, debugInfoWithStackTrace);
|
||||
acquire(onAcquired, executorOnDelay, forceExecution, debugInfo, stackTrace);
|
||||
}
|
||||
|
||||
private void acquire(final ActionListener<Releasable> onAcquired, final String executorOnDelay, final boolean forceExecution,
|
||||
final Throwable debugInfo) {
|
||||
final Object debugInfo, final StackTraceElement[] stackTrace) {
|
||||
if (closed) {
|
||||
onAcquired.onFailure(new IndexShardClosedException(shardId));
|
||||
return;
|
||||
|
@ -249,10 +252,10 @@ final class IndexShardOperationPermits implements Closeable {
|
|||
} else {
|
||||
wrappedListener = new ContextPreservingActionListener<>(contextSupplier, onAcquired);
|
||||
}
|
||||
delayedOperations.add(new DelayedOperation(wrappedListener, debugInfo));
|
||||
delayedOperations.add(new DelayedOperation(wrappedListener, debugInfo, stackTrace));
|
||||
return;
|
||||
} else {
|
||||
releasable = acquire(debugInfo);
|
||||
releasable = acquire(debugInfo, stackTrace);
|
||||
}
|
||||
}
|
||||
} catch (final InterruptedException e) {
|
||||
|
@ -263,21 +266,21 @@ final class IndexShardOperationPermits implements Closeable {
|
|||
onAcquired.onResponse(releasable);
|
||||
}
|
||||
|
||||
private Releasable acquire(Throwable debugInfo) throws InterruptedException {
|
||||
private Releasable acquire(Object debugInfo, StackTraceElement[] stackTrace) throws InterruptedException {
|
||||
assert Thread.holdsLock(this);
|
||||
if (semaphore.tryAcquire(1, 0, TimeUnit.SECONDS)) { // the un-timed tryAcquire methods do not honor the fairness setting
|
||||
final AtomicBoolean closed = new AtomicBoolean();
|
||||
final Releasable releasable = () -> {
|
||||
if (closed.compareAndSet(false, true)) {
|
||||
if (Assertions.ENABLED) {
|
||||
Throwable e = issuedPermits.remove(closed);
|
||||
assert e != null;
|
||||
Tuple<String, StackTraceElement[]> existing = issuedPermits.remove(closed);
|
||||
assert existing != null;
|
||||
}
|
||||
semaphore.release(1);
|
||||
}
|
||||
};
|
||||
if (Assertions.ENABLED) {
|
||||
issuedPermits.put(closed, debugInfo);
|
||||
issuedPermits.put(closed, new Tuple<>(debugInfo.toString(), stackTrace));
|
||||
}
|
||||
return releasable;
|
||||
} else {
|
||||
|
@ -306,23 +309,28 @@ final class IndexShardOperationPermits implements Closeable {
|
|||
}
|
||||
|
||||
/**
|
||||
* @return a list of containing an exception for each permit that wasn't released yet. The stack traces of the exceptions
|
||||
* was captured when the operation acquired the permit and their message contains the debug information supplied at the time.
|
||||
* @return a list of describing each permit that wasn't released yet. The description consist of the debugInfo supplied
|
||||
* when the permit was acquired plus a stack traces that was captured when the permit was request.
|
||||
*/
|
||||
List<Throwable> getActiveOperations() {
|
||||
return new ArrayList<>(issuedPermits.values());
|
||||
List<String> getActiveOperations() {
|
||||
return issuedPermits.values().stream().map(
|
||||
t -> t.v1() + "\n" + ExceptionsHelper.formatStackTrace(t.v2()))
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
private static class DelayedOperation {
|
||||
private final ActionListener<Releasable> listener;
|
||||
private final Throwable debugInfo;
|
||||
private final ActionListener<Releasable> listener;
|
||||
private final String debugInfo;
|
||||
private final StackTraceElement[] stackTrace;
|
||||
|
||||
private DelayedOperation(ActionListener<Releasable> listener, Throwable debugInfo) {
|
||||
private DelayedOperation(ActionListener<Releasable> listener, Object debugInfo, StackTraceElement[] stackTrace) {
|
||||
this.listener = listener;
|
||||
if (Assertions.ENABLED) {
|
||||
this.debugInfo = new Throwable("delayed", debugInfo);
|
||||
this.debugInfo = "[delayed] " + debugInfo;
|
||||
this.stackTrace = stackTrace;
|
||||
} else {
|
||||
this.debugInfo = null;
|
||||
this.stackTrace = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -662,21 +662,21 @@ public class IndexShardOperationPermitsTests extends ESTestCase {
|
|||
permits.acquire(listener2, null, false, "listener2");
|
||||
|
||||
assertThat(permits.getActiveOperationsCount(), equalTo(2));
|
||||
List<String> messages = permits.getActiveOperations().stream().map(Throwable::getMessage).collect(Collectors.toList());
|
||||
List<String> messages = permits.getActiveOperations().stream().collect(Collectors.toList());
|
||||
assertThat(messages, hasSize(2));
|
||||
assertThat(messages, containsInAnyOrder(Arrays.asList(containsString("listener1"), containsString("listener2"))));
|
||||
|
||||
if (randomBoolean()) {
|
||||
listener1.get().close();
|
||||
assertThat(permits.getActiveOperationsCount(), equalTo(1));
|
||||
messages = permits.getActiveOperations().stream().map(Throwable::getMessage).collect(Collectors.toList());
|
||||
messages = permits.getActiveOperations().stream().collect(Collectors.toList());
|
||||
assertThat(messages, hasSize(1));
|
||||
assertThat(messages, contains(containsString("listener2")));
|
||||
listener2.get().close();
|
||||
} else {
|
||||
listener2.get().close();
|
||||
assertThat(permits.getActiveOperationsCount(), equalTo(1));
|
||||
messages = permits.getActiveOperations().stream().map(Throwable::getMessage).collect(Collectors.toList());
|
||||
messages = permits.getActiveOperations().stream().collect(Collectors.toList());
|
||||
assertThat(messages, hasSize(1));
|
||||
assertThat(messages, contains(containsString("listener1")));
|
||||
listener1.get().close();
|
||||
|
|
|
@ -28,7 +28,6 @@ import org.apache.logging.log4j.Logger;
|
|||
import org.apache.lucene.store.AlreadyClosedException;
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
|
||||
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
|
||||
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags.Flag;
|
||||
|
@ -1138,11 +1137,11 @@ public final class InternalTestCluster extends TestCluster {
|
|||
IndicesService indexServices = getInstance(IndicesService.class, nodeAndClient.name);
|
||||
for (IndexService indexService : indexServices) {
|
||||
for (IndexShard indexShard : indexService) {
|
||||
List<Throwable> operations = indexShard.getActiveOperations();
|
||||
List<String> operations = indexShard.getActiveOperations();
|
||||
if (operations.size() > 0) {
|
||||
throw new AssertionError(
|
||||
"shard " + indexShard.shardId() + " on node [" + nodeAndClient.name + "] has pending operations:\n" +
|
||||
operations.stream().map(e -> "--> " + ExceptionsHelper.stackTrace(e)).collect(Collectors.joining("\n"))
|
||||
"shard " + indexShard.shardId() + " on node [" + nodeAndClient.name + "] has pending operations:\n --> " +
|
||||
operations.stream().collect(Collectors.joining("\n --> "))
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue