ThreadPool and ThreadContext are not closeable (#43249) (#49273)

This commit changes the ThreadContext to just use a regular ThreadLocal
over the lucene CloseableThreadLocal. The CloseableThreadLocal solves
issues with ThreadLocals that are no longer needed during runtime but
in the case of the ThreadContext, we need it for the runtime of the
node and it is typically not closed until the node closes, so we miss
out on the benefits that this class provides.

Additionally by removing the close logic, we simplify code in other
places that deal with exceptions and tracking to see if it happens when
the node is closing.

Closes #42577
This commit is contained in:
Jay Modi 2019-11-19 13:15:16 -07:00 committed by GitHub
parent c4c8a7a43c
commit eed4cd25eb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 653 additions and 846 deletions

View File

@ -272,10 +272,9 @@ public class JsonLoggerTests extends ESTestCase {
public void testDuplicateLogMessages() throws IOException {
final DeprecationLogger deprecationLogger = new DeprecationLogger(LogManager.getLogger("test"));
// For the same key and X-Opaque-ID deprecation should be once
try (ThreadContext threadContext = new ThreadContext(Settings.EMPTY)) {
try{
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
threadContext.putHeader(Task.X_OPAQUE_ID, "ID1");
DeprecationLogger.setThreadContext(threadContext);
deprecationLogger.deprecatedAndMaybeLog("key", "message1");
@ -300,15 +299,14 @@ public class JsonLoggerTests extends ESTestCase {
)
);
}
}finally{
} finally {
DeprecationLogger.removeThreadContext(threadContext);
}
}
// For the same key and different X-Opaque-ID should be multiple times per key/x-opaque-id
//continuing with message1-ID1 in logs already, adding a new deprecation log line with message2-ID2
try (ThreadContext threadContext = new ThreadContext(Settings.EMPTY)) {
try{
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
threadContext.putHeader(Task.X_OPAQUE_ID, "ID2");
DeprecationLogger.setThreadContext(threadContext);
deprecationLogger.deprecatedAndMaybeLog("key", "message1");
@ -343,11 +341,10 @@ public class JsonLoggerTests extends ESTestCase {
)
);
}
}finally{
} finally {
DeprecationLogger.removeThreadContext(threadContext);
}
}
}
private List<JsonLogLine> collectLines(Stream<JsonLogLine> stream) {
return stream

View File

@ -262,7 +262,6 @@ public class DeprecationLogger {
public String getXOpaqueId(Set<ThreadContext> threadContexts) {
return threadContexts.stream()
.filter(t -> t.isClosed() == false)
.filter(t -> t.getHeader(Task.X_OPAQUE_ID) != null)
.findFirst()
.map(t -> t.getHeader(Task.X_OPAQUE_ID))

View File

@ -106,17 +106,8 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor {
}
private boolean assertDefaultContext(Runnable r) {
try {
assert contextHolder.isDefaultContext() : "the thread context is not the default context and the thread [" +
Thread.currentThread().getName() + "] is being returned to the pool after executing [" + r + "]";
} catch (IllegalStateException ex) {
// sometimes we execute on a closed context and isDefaultContext doen't bypass the ensureOpen checks
// this must not trigger an exception here since we only assert if the default is restored and
// we don't really care if we are closed
if (contextHolder.isClosed() == false) {
throw ex;
}
}
return true;
}

View File

@ -20,10 +20,10 @@ package org.elasticsearch.common.util.concurrent;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.CloseableThreadLocal;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
@ -33,7 +33,6 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.http.HttpTransportSettings;
import org.elasticsearch.tasks.Task;
import java.io.Closeable;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
@ -44,7 +43,6 @@ import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.BinaryOperator;
import java.util.function.Function;
@ -84,7 +82,7 @@ import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_WARN
* </pre>
*
*/
public final class ThreadContext implements Closeable, Writeable {
public final class ThreadContext implements Writeable {
public static final String PREFIX = "request.headers";
public static final Setting<Settings> DEFAULT_HEADERS_SETTING = Setting.groupSetting(PREFIX + ".", Property.NodeScope);
@ -97,7 +95,7 @@ public final class ThreadContext implements Closeable, Writeable {
private static final Logger logger = LogManager.getLogger(ThreadContext.class);
private static final ThreadContextStruct DEFAULT_CONTEXT = new ThreadContextStruct();
private final Map<String, String> defaultHeader;
private final ContextThreadLocal threadLocal;
private final ThreadLocal<ThreadContextStruct> threadLocal;
private final int maxWarningHeaderCount;
private final long maxWarningHeaderSize;
@ -106,26 +104,12 @@ public final class ThreadContext implements Closeable, Writeable {
* @param settings the settings to read the default request headers from
*/
public ThreadContext(Settings settings) {
Settings headers = DEFAULT_HEADERS_SETTING.get(settings);
if (headers == null) {
this.defaultHeader = Collections.emptyMap();
} else {
Map<String, String> defaultHeader = new HashMap<>();
for (String key : headers.names()) {
defaultHeader.put(key, headers.get(key));
}
this.defaultHeader = Collections.unmodifiableMap(defaultHeader);
}
threadLocal = new ContextThreadLocal();
this.defaultHeader = buildDefaultHeaders(settings);
this.threadLocal = ThreadLocal.withInitial(() -> DEFAULT_CONTEXT);
this.maxWarningHeaderCount = SETTING_HTTP_MAX_WARNING_HEADER_COUNT.get(settings);
this.maxWarningHeaderSize = SETTING_HTTP_MAX_WARNING_HEADER_SIZE.get(settings).getBytes();
}
@Override
public void close() {
threadLocal.close();
}
/**
* Removes the current context and resets a default context. The removed context can be
* restored by closing the returned {@link StoredContext}.
@ -144,19 +128,13 @@ public final class ThreadContext implements Closeable, Writeable {
.immutableMap());
threadLocal.set(threadContextStruct);
} else {
threadLocal.set(null);
threadLocal.set(DEFAULT_CONTEXT);
}
return () -> {
// If the node and thus the threadLocal get closed while this task
// is still executing, we don't want this runnable to fail with an
// uncaught exception
try {
threadLocal.set(context);
} catch (IllegalStateException e) {
if (isClosed() == false) {
throw e;
}
}
};
}
@ -259,6 +237,19 @@ public final class ThreadContext implements Closeable, Writeable {
* Reads the headers from the stream into the current context
*/
public void readHeaders(StreamInput in) throws IOException {
final Tuple<Map<String, String>, Map<String, Set<String>>> streamTuple = readHeadersFromStream(in);
final Map<String, String> requestHeaders = streamTuple.v1();
final Map<String, Set<String>> responseHeaders = streamTuple.v2();
final ThreadContextStruct struct;
if (requestHeaders.isEmpty() && responseHeaders.isEmpty()) {
struct = ThreadContextStruct.EMPTY;
} else {
struct = new ThreadContextStruct(requestHeaders, responseHeaders, Collections.emptyMap(), false);
}
threadLocal.set(struct);
}
public static Tuple<Map<String, String>, Map<String, Set<String>>> readHeadersFromStream(StreamInput in) throws IOException {
final Map<String, String> requestHeaders = in.readMap(StreamInput::readString, StreamInput::readString);
final Map<String, Set<String>> responseHeaders = in.readMap(StreamInput::readString, input -> {
final int size = input.readVInt();
@ -277,13 +268,7 @@ public final class ThreadContext implements Closeable, Writeable {
return values;
}
});
final ThreadContextStruct struct;
if (requestHeaders.isEmpty() && responseHeaders.isEmpty()) {
struct = ThreadContextStruct.EMPTY;
} else {
struct = new ThreadContextStruct(requestHeaders, responseHeaders, Collections.emptyMap(), false);
}
threadLocal.set(struct);
return new Tuple<>(requestHeaders, responseHeaders);
}
/**
@ -377,17 +362,7 @@ public final class ThreadContext implements Closeable, Writeable {
* @param uniqueValue the function that produces de-duplication values
*/
public void addResponseHeader(final String key, final String value, final Function<String, String> uniqueValue) {
/*
* Updating the thread local is expensive due to a shared reference that we synchronize on, so we should only do it if the thread
* context struct changed. It will not change if we de-duplicate this value to an existing one, or if we don't add a new one because
* we have reached capacity.
*/
final ThreadContextStruct current = threadLocal.get();
final ThreadContextStruct maybeNext =
current.putResponse(key, value, uniqueValue, maxWarningHeaderCount, maxWarningHeaderSize);
if (current != maybeNext) {
threadLocal.set(maybeNext);
}
threadLocal.set(threadLocal.get().putResponse(key, value, uniqueValue, maxWarningHeaderCount, maxWarningHeaderSize));
}
/**
@ -439,13 +414,6 @@ public final class ThreadContext implements Closeable, Writeable {
return threadLocal.get().isSystemContext;
}
/**
* Returns <code>true</code> if the context is closed, otherwise <code>true</code>
*/
public boolean isClosed() {
return threadLocal.closed.get();
}
@FunctionalInterface
public interface StoredContext extends AutoCloseable {
@Override
@ -456,6 +424,19 @@ public final class ThreadContext implements Closeable, Writeable {
}
}
public static Map<String, String> buildDefaultHeaders(Settings settings) {
Settings headers = DEFAULT_HEADERS_SETTING.get(settings);
if (headers == null) {
return Collections.emptyMap();
} else {
Map<String, String> defaultHeader = new HashMap<>();
for (String key : headers.names()) {
defaultHeader.put(key, headers.get(key));
}
return Collections.unmodifiableMap(defaultHeader);
}
}
private static final class ThreadContextStruct {
private static final ThreadContextStruct EMPTY =
@ -633,55 +614,6 @@ public final class ThreadContext implements Closeable, Writeable {
}
}
private static class ContextThreadLocal extends CloseableThreadLocal<ThreadContextStruct> {
private final AtomicBoolean closed = new AtomicBoolean(false);
@Override
public void set(ThreadContextStruct object) {
try {
if (object == DEFAULT_CONTEXT) {
super.set(null);
} else {
super.set(object);
}
} catch (NullPointerException ex) {
/* This is odd but CloseableThreadLocal throws a NPE if it was closed but still accessed.
to get a real exception we call ensureOpen() to tell the user we are already closed.*/
ensureOpen();
throw ex;
}
}
@Override
public ThreadContextStruct get() {
try {
ThreadContextStruct threadContextStruct = super.get();
if (threadContextStruct != null) {
return threadContextStruct;
}
return DEFAULT_CONTEXT;
} catch (NullPointerException ex) {
/* This is odd but CloseableThreadLocal throws a NPE if it was closed but still accessed.
to get a real exception we call ensureOpen() to tell the user we are already closed.*/
ensureOpen();
throw ex;
}
}
private void ensureOpen() {
if (closed.get()) {
throw new IllegalStateException("threadcontext is already closed");
}
}
@Override
public void close() {
if (closed.compareAndSet(false, true)) {
super.close();
}
}
}
/**
* Wraps a Runnable to preserve the thread context.
*/
@ -696,19 +628,9 @@ public final class ThreadContext implements Closeable, Writeable {
@Override
public void run() {
boolean whileRunning = false;
try (ThreadContext.StoredContext ignore = stashContext()){
ctx.restore();
whileRunning = true;
in.run();
whileRunning = false;
} catch (IllegalStateException ex) {
if (whileRunning || threadLocal.closed.get() == false) {
throw ex;
}
// if we hit an ISE here we have been shutting down
// this comes from the threadcontext and barfs if
// our threadpool has been shutting down
}
}
@ -765,21 +687,9 @@ public final class ThreadContext implements Closeable, Writeable {
@Override
protected void doRun() throws Exception {
boolean whileRunning = false;
threadsOriginalContext = stashContext();
try {
creatorsContext.restore();
whileRunning = true;
in.doRun();
whileRunning = false;
} catch (IllegalStateException ex) {
if (whileRunning || threadLocal.closed.get() == false) {
throw ex;
}
// if we hit an ISE here we have been shutting down
// this comes from the threadcontext and barfs if
// our threadpool has been shutting down
}
}
@Override

View File

@ -40,7 +40,6 @@ import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.node.Node;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
@ -60,7 +59,7 @@ import java.util.stream.Collectors;
import static java.util.Collections.unmodifiableMap;
public class ThreadPool implements Scheduler, Closeable {
public class ThreadPool implements Scheduler {
private static final Logger logger = LogManager.getLogger(ThreadPool.class);
@ -754,7 +753,6 @@ public class ThreadPool implements Scheduler, Closeable {
public static boolean terminate(ThreadPool pool, long timeout, TimeUnit timeUnit) {
if (pool != null) {
// Leverage try-with-resources to close the threadpool
try (ThreadPool c = pool) {
pool.shutdown();
if (awaitTermination(pool, timeout, timeUnit)) {
return true;
@ -763,7 +761,6 @@ public class ThreadPool implements Scheduler, Closeable {
pool.shutdownNow();
return awaitTermination(pool, timeout, timeUnit);
}
}
return false;
}
@ -781,11 +778,6 @@ public class ThreadPool implements Scheduler, Closeable {
return false;
}
@Override
public void close() {
threadContext.close();
}
public ThreadContext getThreadContext() {
return threadContext;
}

View File

@ -25,7 +25,6 @@ import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.Compressor;
import org.elasticsearch.common.compress.NotCompressedException;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.internal.io.IOUtils;
@ -94,9 +93,8 @@ public final class TransportLogger {
streamInput = compressor.streamInput(streamInput);
}
try (ThreadContext context = new ThreadContext(Settings.EMPTY)) {
context.readHeaders(streamInput);
}
// read and discard headers
ThreadContext.readHeadersFromStream(streamInput);
// now we decode the features
if (streamInput.getVersion().onOrAfter(Version.V_6_3_0)) {
streamInput.readStringArray();

View File

@ -28,7 +28,7 @@ import java.io.IOException;
public class ContextPreservingActionListenerTests extends ESTestCase {
public void testOriginalContextIsPreservedAfterOnResponse() throws IOException {
try (ThreadContext threadContext = new ThreadContext(Settings.EMPTY)) {
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
final boolean nonEmptyContext = randomBoolean();
if (nonEmptyContext) {
threadContext.putHeader("not empty", "value");
@ -63,10 +63,9 @@ public class ContextPreservingActionListenerTests extends ESTestCase {
assertNull(threadContext.getHeader("foo"));
assertEquals(nonEmptyContext ? "value" : null, threadContext.getHeader("not empty"));
}
}
public void testOriginalContextIsPreservedAfterOnFailure() throws Exception {
try (ThreadContext threadContext = new ThreadContext(Settings.EMPTY)) {
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
final boolean nonEmptyContext = randomBoolean();
if (nonEmptyContext) {
threadContext.putHeader("not empty", "value");
@ -102,11 +101,10 @@ public class ContextPreservingActionListenerTests extends ESTestCase {
assertNull(threadContext.getHeader("foo"));
assertEquals(nonEmptyContext ? "value" : null, threadContext.getHeader("not empty"));
}
}
}
public void testOriginalContextIsWhenListenerThrows() throws Exception {
try (ThreadContext threadContext = new ThreadContext(Settings.EMPTY)) {
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
final boolean nonEmptyContext = randomBoolean();
if (nonEmptyContext) {
threadContext.putHeader("not empty", "value");
@ -152,5 +150,4 @@ public class ContextPreservingActionListenerTests extends ESTestCase {
assertNull(threadContext.getHeader("foo"));
assertEquals(nonEmptyContext ? "value" : null, threadContext.getHeader("not empty"));
}
}
}

View File

@ -42,7 +42,6 @@ import java.security.Permissions;
import java.security.PrivilegedAction;
import java.security.ProtectionDomain;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@ -78,7 +77,7 @@ public class DeprecationLoggerTests extends ESTestCase {
}
public void testAddsHeaderWithThreadContext() throws IOException {
try (ThreadContext threadContext = new ThreadContext(Settings.EMPTY)) {
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
final Set<ThreadContext> threadContexts = Collections.singleton(threadContext);
final String param = randomAlphaOfLengthBetween(1, 5);
@ -92,10 +91,9 @@ public class DeprecationLoggerTests extends ESTestCase {
assertThat(responses.get(0), warningValueMatcher);
assertThat(responses.get(0), containsString("\"A simple message [" + param + "]\""));
}
}
public void testContainingNewline() throws IOException {
try (ThreadContext threadContext = new ThreadContext(Settings.EMPTY)) {
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
final Set<ThreadContext> threadContexts = Collections.singleton(threadContext);
logger.deprecated(threadContexts, "this message contains a newline\n");
@ -108,10 +106,9 @@ public class DeprecationLoggerTests extends ESTestCase {
assertThat(responses.get(0), warningValueMatcher);
assertThat(responses.get(0), containsString("\"this message contains a newline%0A\""));
}
}
public void testSurrogatePair() throws IOException {
try (ThreadContext threadContext = new ThreadContext(Settings.EMPTY)) {
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
final Set<ThreadContext> threadContexts = Collections.singleton(threadContext);
logger.deprecated(threadContexts, "this message contains a surrogate pair 😱");
@ -140,10 +137,9 @@ public class DeprecationLoggerTests extends ESTestCase {
assertThat(sb.toString(), equalTo("%F0%9F%98%B1"));
assertThat(responses.get(0), containsString("\"this message contains a surrogate pair %F0%9F%98%B1\""));
}
}
public void testAddsCombinedHeaderWithThreadContext() throws IOException {
try (ThreadContext threadContext = new ThreadContext(Settings.EMPTY)) {
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
final Set<ThreadContext> threadContexts = Collections.singleton(threadContext);
final String param = randomAlphaOfLengthBetween(1, 5);
@ -163,13 +159,12 @@ public class DeprecationLoggerTests extends ESTestCase {
assertThat(responses.get(1), warningValueMatcher);
assertThat(responses.get(1), containsString("\"" + second + "\""));
}
}
public void testCanRemoveThreadContext() throws IOException {
final String expected = "testCanRemoveThreadContext";
final String unexpected = "testCannotRemoveThreadContext";
try (ThreadContext threadContext = new ThreadContext(Settings.EMPTY)) {
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
DeprecationLogger.setThreadContext(threadContext);
logger.deprecated(expected);
@ -195,20 +190,6 @@ public class DeprecationLoggerTests extends ESTestCase {
assertThat(responses.get(0), not(containsString(unexpected)));
}
}
}
public void testIgnoresClosedThreadContext() throws IOException {
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
Set<ThreadContext> threadContexts = new HashSet<>(1);
threadContexts.add(threadContext);
threadContext.close();
logger.deprecated(threadContexts, "Ignored logger message");
assertTrue(threadContexts.contains(threadContext));
}
public void testSafeWithoutThreadContext() {
logger.deprecated(Collections.emptySet(), "Ignored");
@ -219,7 +200,7 @@ public class DeprecationLoggerTests extends ESTestCase {
}
public void testFailsWhenDoubleSettingSameThreadContext() throws IOException {
try (ThreadContext threadContext = new ThreadContext(Settings.EMPTY)) {
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
DeprecationLogger.setThreadContext(threadContext);
try {
@ -229,13 +210,11 @@ public class DeprecationLoggerTests extends ESTestCase {
DeprecationLogger.removeThreadContext(threadContext);
}
}
}
public void testFailsWhenRemovingUnknownThreadContext() throws IOException {
try (ThreadContext threadContext = new ThreadContext(Settings.EMPTY)) {
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
expectThrows(IllegalStateException.class, () -> DeprecationLogger.removeThreadContext(threadContext));
}
}
public void testWarningValueFromWarningHeader() throws InterruptedException {
final String s = randomAlphaOfLength(16);
@ -274,7 +253,7 @@ public class DeprecationLoggerTests extends ESTestCase {
Settings settings = Settings.builder()
.put("http.max_warning_header_count", maxWarningHeaderCount)
.build();
try (ThreadContext threadContext = new ThreadContext(settings)) {
ThreadContext threadContext = new ThreadContext(settings);
final Set<ThreadContext> threadContexts = Collections.singleton(threadContext);
// try to log three warning messages
logger.deprecated(threadContexts, "A simple message 1");
@ -289,7 +268,6 @@ public class DeprecationLoggerTests extends ESTestCase {
assertThat(responses.get(1), warningValueMatcher);
assertThat(responses.get(1), containsString("\"A simple message 2"));
}
}
public void testWarningHeaderSizeSetting() throws IOException{
// Test that the size of warning headers don't exceed 'http.max_warning_header_size'
@ -302,7 +280,7 @@ public class DeprecationLoggerTests extends ESTestCase {
String message2 = new String(arr, StandardCharsets.UTF_8) + "2";
String message3 = new String(arr, StandardCharsets.UTF_8) + "3";
try (ThreadContext threadContext = new ThreadContext(settings)) {
ThreadContext threadContext = new ThreadContext(settings);
final Set<ThreadContext> threadContexts = Collections.singleton(threadContext);
// try to log three warning messages
logger.deprecated(threadContexts, message1);
@ -319,7 +297,6 @@ public class DeprecationLoggerTests extends ESTestCase {
// assert that the size of all warning headers is less or equal to 1Kb
assertTrue(warningHeadersSize <= 1024);
}
}
@SuppressLoggerChecks(reason = "Safe as this is using mockito")
public void testLogPermissions() {
AtomicBoolean supplierCalled = new AtomicBoolean(false);

View File

@ -21,7 +21,6 @@ package org.elasticsearch.common.util.concurrent;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESTestCase;
import org.junit.After;
import org.junit.Before;
import java.io.IOException;
@ -47,11 +46,6 @@ public class AsyncIOProcessorTests extends ESTestCase {
threadContext = new ThreadContext(Settings.EMPTY);
}
@After
public void tearDownThreadContext() {
threadContext.close();
}
public void testPut() throws InterruptedException {
boolean blockInternal = randomBoolean();
AtomicInteger received = new AtomicInteger(0);

View File

@ -64,7 +64,6 @@ public class QueueResizingEsThreadPoolExecutorTests extends ESTestCase {
});
executor.shutdown();
executor.awaitTermination(10, TimeUnit.SECONDS);
context.close();
}
public void testAutoQueueSizingUp() throws Exception {
@ -93,7 +92,6 @@ public class QueueResizingEsThreadPoolExecutorTests extends ESTestCase {
});
executor.shutdown();
executor.awaitTermination(10, TimeUnit.SECONDS);
context.close();
}
public void testAutoQueueSizingDown() throws Exception {
@ -121,7 +119,6 @@ public class QueueResizingEsThreadPoolExecutorTests extends ESTestCase {
});
executor.shutdown();
executor.awaitTermination(10, TimeUnit.SECONDS);
context.close();
}
public void testAutoQueueSizingWithMin() throws Exception {
@ -151,7 +148,6 @@ public class QueueResizingEsThreadPoolExecutorTests extends ESTestCase {
});
executor.shutdown();
executor.awaitTermination(10, TimeUnit.SECONDS);
context.close();
}
public void testAutoQueueSizingWithMax() throws Exception {
@ -181,7 +177,6 @@ public class QueueResizingEsThreadPoolExecutorTests extends ESTestCase {
});
executor.shutdown();
executor.awaitTermination(10, TimeUnit.SECONDS);
context.close();
}
public void testExecutionEWMACalculation() throws Exception {
@ -222,7 +217,6 @@ public class QueueResizingEsThreadPoolExecutorTests extends ESTestCase {
executor.shutdown();
executor.awaitTermination(10, TimeUnit.SECONDS);
context.close();
}
/** Use a runnable wrapper that simulates a task with unknown failures. */
@ -244,7 +238,6 @@ public class QueueResizingEsThreadPoolExecutorTests extends ESTestCase {
executeTask(executor, 1);
executor.shutdown();
executor.awaitTermination(10, TimeUnit.SECONDS);
context.close();
}
private Function<Runnable, WrappedRunnable> fastWrapper() {

View File

@ -244,35 +244,6 @@ public class ThreadContextTests extends ESTestCase {
assertEquals("bar", threadContext.getHeader("foo"));
}
public void testAccessClosed() throws IOException {
Settings build = Settings.builder().put("request.headers.default", "1").build();
ThreadContext threadContext = new ThreadContext(build);
threadContext.putHeader("foo", "bar");
threadContext.putTransient("ctx.foo", 1);
threadContext.close();
try {
threadContext.getHeader("foo");
fail();
} catch (IllegalStateException ise) {
assertEquals("threadcontext is already closed", ise.getMessage());
}
try {
threadContext.putTransient("foo", new Object());
fail();
} catch (IllegalStateException ise) {
assertEquals("threadcontext is already closed", ise.getMessage());
}
try {
threadContext.putHeader("boom", "boom");
fail();
} catch (IllegalStateException ise) {
assertEquals("threadcontext is already closed", ise.getMessage());
}
}
public void testSerialize() throws IOException {
Settings build = Settings.builder().put("request.headers.default", "1").build();
ThreadContext threadContext = new ThreadContext(build);
@ -397,7 +368,7 @@ public class ThreadContextTests extends ESTestCase {
}
public void testPreserveContext() throws IOException {
try (ThreadContext threadContext = new ThreadContext(Settings.EMPTY)) {
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
Runnable withContext;
// Create a runnable that should run with some header
@ -417,10 +388,9 @@ public class ThreadContextTests extends ESTestCase {
// but not after
assertNull(threadContext.getHeader("foo"));
}
}
public void testPreserveContextKeepsOriginalContextWhenCalledTwice() throws IOException {
try (ThreadContext threadContext = new ThreadContext(Settings.EMPTY)) {
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
Runnable originalWithContext;
Runnable withContext;
@ -445,10 +415,9 @@ public class ThreadContextTests extends ESTestCase {
// In fact the second wrapping didn't even change it
assertThat(withContext, sameInstance(originalWithContext));
}
}
public void testPreservesThreadsOriginalContextOnRunException() throws IOException {
try (ThreadContext threadContext = new ThreadContext(Settings.EMPTY)) {
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
Runnable withContext;
// create a abstract runnable, add headers and transient objects and verify in the methods
@ -536,10 +505,9 @@ public class ThreadContextTests extends ESTestCase {
assertNull(threadContext.getTransient("run"));
assertTrue(threadContext.isDefaultContext());
}
}
public void testPreservesThreadsOriginalContextOnFailureException() throws IOException {
try (ThreadContext threadContext = new ThreadContext(Settings.EMPTY)) {
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
Runnable withContext;
// a runnable that throws from onFailure
@ -577,10 +545,9 @@ public class ThreadContextTests extends ESTestCase {
assertNull(threadContext.getTransient("foo"));
assertTrue(threadContext.isDefaultContext());
}
}
public void testPreservesThreadsOriginalContextOnAfterException() throws IOException {
try (ThreadContext threadContext = new ThreadContext(Settings.EMPTY)) {
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
Runnable withContext;
// a runnable that throws from onAfter
@ -623,10 +590,9 @@ public class ThreadContextTests extends ESTestCase {
assertNull(threadContext.getTransient("foo"));
assertTrue(threadContext.isDefaultContext());
}
}
public void testMarkAsSystemContext() throws IOException {
try (ThreadContext threadContext = new ThreadContext(Settings.EMPTY)) {
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
assertFalse(threadContext.isSystemContext());
try (ThreadContext.StoredContext context = threadContext.stashContext()) {
assertFalse(threadContext.isSystemContext());
@ -635,7 +601,6 @@ public class ThreadContextTests extends ESTestCase {
}
assertFalse(threadContext.isSystemContext());
}
}
public void testPutHeaders() {
Settings build = Settings.builder().put("request.headers.default", "1").build();

View File

@ -36,7 +36,6 @@ import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.MockHttpTransport;
import org.elasticsearch.threadpool.ThreadPool;
import org.hamcrest.Matchers;
import java.io.IOException;
import java.nio.file.Path;
@ -44,12 +43,14 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.containsString;
@LuceneTestCase.SuppressFileSystems(value = "ExtrasFS")
public class NodeTests extends ESTestCase {
@ -166,7 +167,6 @@ public class NodeTests extends ESTestCase {
assertTrue(node.awaitClose(1, TimeUnit.DAYS));
}
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/42577")
public void testCloseRaceWithTaskExecution() throws Exception {
Node node = new MockNode(baseSettings().build(), basePlugins());
node.start();
@ -180,9 +180,13 @@ public class NodeTests extends ESTestCase {
} catch (InterruptedException e) {
throw new AssertionError("interrupted while waiting", e);
}
try {
threadpool.executor(ThreadPool.Names.SEARCH).execute(() -> {
while (shouldRun.get());
});
} catch (RejectedExecutionException e) {
assertThat(e.getMessage(), containsString("[Terminated,"));
}
});
Thread closeThread = new Thread(() -> {
running.countDown();
@ -268,7 +272,7 @@ public class NodeTests extends ESTestCase {
IllegalStateException e = expectThrows(IllegalStateException.class, () -> node.awaitClose(1, TimeUnit.DAYS));
searcher.close();
assertThat(e.getMessage(), Matchers.containsString("Something is leaking index readers or store references"));
assertThat(e.getMessage(), containsString("Something is leaking index readers or store references"));
}
public void testCloseOnLeakedStoreReference() throws Exception {
@ -284,6 +288,6 @@ public class NodeTests extends ESTestCase {
IllegalStateException e = expectThrows(IllegalStateException.class, () -> node.awaitClose(1, TimeUnit.DAYS));
shard.store().decRef();
assertThat(e.getMessage(), Matchers.containsString("Something is leaking index readers or store references"));
assertThat(e.getMessage(), containsString("Something is leaking index readers or store references"));
}
}

View File

@ -62,8 +62,7 @@ public class ThreadPoolTests extends ESTestCase {
// the delta can be large, we just care it is the same order of magnitude
assertTrue("thread pool cached absolute time " + gotTime + " is too far from real current time " + currentTime, delta < 10000);
} finally {
threadPool.shutdown();
threadPool.close();
terminate(threadPool);
}
}

View File

@ -90,9 +90,8 @@ public class TransportLoggerTests extends ESTestCase {
private BytesReference buildRequest() throws IOException {
try (BytesStreamOutput messageOutput = new BytesStreamOutput()) {
messageOutput.setVersion(Version.CURRENT);
try (ThreadContext context = new ThreadContext(Settings.EMPTY)) {
ThreadContext context = new ThreadContext(Settings.EMPTY);
context.writeTo(messageOutput);
}
messageOutput.writeStringArray(new String[0]);
messageOutput.writeString(ClusterStatsAction.NAME);
new ClusterStatsRequest().writeTo(messageOutput);

View File

@ -374,7 +374,8 @@ public abstract class ESTestCase extends LuceneTestCase {
// initialized
if (threadContext != null) {
ensureNoWarnings();
assert threadContext == null;
DeprecationLogger.removeThreadContext(threadContext);
threadContext = null;
}
ensureAllSearchContextsReleased();
ensureCheckIndexPassed();
@ -403,7 +404,7 @@ public abstract class ESTestCase extends LuceneTestCase {
assertNull("unexpected warning headers", warnings);
}
} finally {
resetDeprecationLogger(false);
resetDeprecationLogger();
}
}
@ -441,7 +442,7 @@ public abstract class ESTestCase extends LuceneTestCase {
assertWarnings(actualWarnings, expectedWarnings);
}
} finally {
resetDeprecationLogger(true);
resetDeprecationLogger();
}
}
@ -464,21 +465,11 @@ public abstract class ESTestCase extends LuceneTestCase {
}
/**
* Reset the deprecation logger by removing the current thread context, and setting a new thread context if {@code setNewThreadContext}
* is set to {@code true} and otherwise clearing the current thread context.
*
* @param setNewThreadContext whether or not to attach a new thread context to the deprecation logger
* Reset the deprecation logger by clearing the current thread context.
*/
private void resetDeprecationLogger(final boolean setNewThreadContext) {
// "clear" current warning headers by setting a new ThreadContext
DeprecationLogger.removeThreadContext(this.threadContext);
this.threadContext.close();
if (setNewThreadContext) {
this.threadContext = new ThreadContext(Settings.EMPTY);
DeprecationLogger.setThreadContext(this.threadContext);
} else {
this.threadContext = null;
}
private void resetDeprecationLogger() {
// "clear" context by stashing current values and dropping the returned StoredContext
threadContext.stashContext();
}
private static final List<StatusData> statusData = new ArrayList<>();

View File

@ -863,14 +863,13 @@ public abstract class ESRestTestCase extends ESTestCase {
throw new RuntimeException("Error setting up ssl", e);
}
}
try (ThreadContext threadContext = new ThreadContext(settings)) {
Header[] defaultHeaders = new Header[threadContext.getHeaders().size()];
Map<String, String> headers = ThreadContext.buildDefaultHeaders(settings);
Header[] defaultHeaders = new Header[headers.size()];
int i = 0;
for (Map.Entry<String, String> entry : threadContext.getHeaders().entrySet()) {
for (Map.Entry<String, String> entry : headers.entrySet()) {
defaultHeaders[i++] = new BasicHeader(entry.getKey(), entry.getValue());
}
builder.setDefaultHeaders(defaultHeaders);
}
final String socketTimeoutString = settings.get(CLIENT_SOCKET_TIMEOUT);
final TimeValue socketTimeout =
TimeValue.parseTimeValue(socketTimeoutString == null ? "60s" : socketTimeoutString, CLIENT_SOCKET_TIMEOUT);

View File

@ -32,6 +32,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
@ -83,7 +84,7 @@ public class SnapshotLifecycleServiceTests extends ESTestCase {
assertThat(e.getMessage(), containsString("no such repository [repo]"));
}
public void testNothingScheduledWhenNotRunning() {
public void testNothingScheduledWhenNotRunning() throws InterruptedException {
ClockMock clock = new ClockMock();
SnapshotLifecyclePolicyMetadata initialPolicy = SnapshotLifecyclePolicyMetadata.builder()
.setPolicy(createPolicy("initial", "*/1 * * * * ?"))
@ -94,8 +95,8 @@ public class SnapshotLifecycleServiceTests extends ESTestCase {
ClusterState initialState = createState(new SnapshotLifecycleMetadata(
Collections.singletonMap(initialPolicy.getPolicy().getId(), initialPolicy),
OperationMode.RUNNING, new SnapshotLifecycleStats()));
try (ThreadPool threadPool = new TestThreadPool("test");
ClusterService clusterService = ClusterServiceUtils.createClusterService(initialState, threadPool);
ThreadPool threadPool = new TestThreadPool("test");
try (ClusterService clusterService = ClusterServiceUtils.createClusterService(initialState, threadPool);
SnapshotLifecycleService sls = new SnapshotLifecycleService(Settings.EMPTY,
() -> new FakeSnapshotTask(e -> logger.info("triggered")), clusterService, clock)) {
@ -140,8 +141,9 @@ public class SnapshotLifecycleServiceTests extends ESTestCase {
sls.onMaster();
sls.clusterChanged(new ClusterChangedEvent("1", state, emptyState));
assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.emptySet()));
} finally {
threadPool.shutdownNow();
threadPool.awaitTermination(10, TimeUnit.SECONDS);
}
}
@ -154,8 +156,8 @@ public class SnapshotLifecycleServiceTests extends ESTestCase {
ClockMock clock = new ClockMock();
final AtomicInteger triggerCount = new AtomicInteger(0);
final AtomicReference<Consumer<SchedulerEngine.Event>> trigger = new AtomicReference<>(e -> triggerCount.incrementAndGet());
try (ThreadPool threadPool = new TestThreadPool("test");
ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool);
ThreadPool threadPool = new TestThreadPool("test");
try (ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool);
SnapshotLifecycleService sls = new SnapshotLifecycleService(Settings.EMPTY,
() -> new FakeSnapshotTask(e -> trigger.get().accept(e)), clusterService, clock)) {
@ -250,8 +252,9 @@ public class SnapshotLifecycleServiceTests extends ESTestCase {
// Signify becoming non-master, the jobs should all be cancelled
sls.offMaster();
assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.emptySet()));
} finally {
threadPool.shutdownNow();
threadPool.awaitTermination(10, TimeUnit.SECONDS);
}
}
@ -262,8 +265,8 @@ public class SnapshotLifecycleServiceTests extends ESTestCase {
ClockMock clock = new ClockMock();
final AtomicInteger triggerCount = new AtomicInteger(0);
final AtomicReference<Consumer<SchedulerEngine.Event>> trigger = new AtomicReference<>(e -> triggerCount.incrementAndGet());
try (ThreadPool threadPool = new TestThreadPool("test");
ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool);
ThreadPool threadPool = new TestThreadPool("test");
try (ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool);
SnapshotLifecycleService sls = new SnapshotLifecycleService(Settings.EMPTY,
() -> new FakeSnapshotTask(e -> trigger.get().accept(e)), clusterService, clock)) {
sls.onMaster();
@ -304,8 +307,9 @@ public class SnapshotLifecycleServiceTests extends ESTestCase {
sls.offMaster();
assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.emptySet()));
} finally {
threadPool.shutdownNow();
threadPool.awaitTermination(10, TimeUnit.SECONDS);
}
}

View File

@ -26,6 +26,7 @@ import org.elasticsearch.xpack.core.watcher.watch.ClockMock;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
@ -43,13 +44,13 @@ public class SnapshotRetentionServiceTests extends ESTestCase {
clusterSettings = new ClusterSettings(Settings.EMPTY, internalSettings);
}
public void testJobsAreScheduled() {
public void testJobsAreScheduled() throws InterruptedException {
final DiscoveryNode discoveryNode = new DiscoveryNode("node", ESTestCase.buildNewFakeTransportAddress(),
Collections.emptyMap(), DiscoveryNodeRole.BUILT_IN_ROLES, Version.CURRENT);
ClockMock clock = new ClockMock();
try (ThreadPool threadPool = new TestThreadPool("test");
ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool, discoveryNode, clusterSettings);
ThreadPool threadPool = new TestThreadPool("test");
try (ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool, discoveryNode, clusterSettings);
SnapshotRetentionService service = new SnapshotRetentionService(Settings.EMPTY,
FakeRetentionTask::new, clusterService, clock)) {
assertThat(service.getScheduler().jobCount(), equalTo(0));
@ -76,14 +77,14 @@ public class SnapshotRetentionServiceTests extends ESTestCase {
}
}
public void testManualTriggering() {
public void testManualTriggering() throws InterruptedException {
final DiscoveryNode discoveryNode = new DiscoveryNode("node", ESTestCase.buildNewFakeTransportAddress(),
Collections.emptyMap(), DiscoveryNodeRole.BUILT_IN_ROLES, Version.CURRENT);
ClockMock clock = new ClockMock();
AtomicInteger invoked = new AtomicInteger(0);
try (ThreadPool threadPool = new TestThreadPool("test");
ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool, discoveryNode, clusterSettings);
ThreadPool threadPool = new TestThreadPool("test");
try (ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool, discoveryNode, clusterSettings);
SnapshotRetentionService service = new SnapshotRetentionService(Settings.EMPTY,
() -> new FakeRetentionTask(event -> {
assertThat(event.getJobName(), equalTo(SnapshotRetentionService.SLM_RETENTION_MANUAL_JOB_ID));
@ -101,8 +102,9 @@ public class SnapshotRetentionServiceTests extends ESTestCase {
service.onMaster();
service.triggerRetention();
assertThat(invoked.get(), equalTo(2));
} finally {
threadPool.shutdownNow();
threadPool.awaitTermination(10, TimeUnit.SECONDS);
}
}

View File

@ -154,8 +154,8 @@ public class SnapshotRetentionTaskTests extends ESTestCase {
}
private void retentionTaskTest(final boolean deletionSuccess) throws Exception {
try (ThreadPool threadPool = new TestThreadPool("slm-test");
ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool);
ThreadPool threadPool = new TestThreadPool("slm-test");
try (ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool);
Client noOpClient = new NoOpClient("slm-test")) {
final String policyId = "policy";
@ -222,7 +222,7 @@ public class SnapshotRetentionTaskTests extends ESTestCase {
boolean historySuccess = historyLatch.await(10, TimeUnit.SECONDS);
assertThat("expected history entries for 1 snapshot deletions", historySuccess, equalTo(true));
assertThat(deletedSnapshotsInHistory, contains(eligibleSnapshot.snapshotId().getName()));
} finally {
threadPool.shutdownNow();
threadPool.awaitTermination(10, TimeUnit.SECONDS);
}
@ -237,8 +237,8 @@ public class SnapshotRetentionTaskTests extends ESTestCase {
}
private void timeBoundedDeletion(final boolean deletionSuccess) throws Exception {
try (ThreadPool threadPool = new TestThreadPool("slm-test");
ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool);
ThreadPool threadPool = new TestThreadPool("slm-test");
try (ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool);
Client noOpClient = new NoOpClient("slm-test")) {
final String policyId = "policy";
@ -321,7 +321,7 @@ public class SnapshotRetentionTaskTests extends ESTestCase {
boolean historySuccess = historyLatch.await(10, TimeUnit.SECONDS);
assertThat("expected history entries for 2 snapshot deletions", historySuccess, equalTo(true));
assertThat(deletedSnapshotsInHistory, containsInAnyOrder(snap1.snapshotId().getName(), snap2.snapshotId().getName()));
} finally {
threadPool.shutdownNow();
threadPool.awaitTermination(10, TimeUnit.SECONDS);
}
@ -374,8 +374,8 @@ public class SnapshotRetentionTaskTests extends ESTestCase {
}
private void doTestSkipDuringMode(OperationMode mode) throws Exception {
try (ThreadPool threadPool = new TestThreadPool("slm-test");
ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool);
ThreadPool threadPool = new TestThreadPool("slm-test");
try (ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool);
Client noOpClient = new NoOpClient("slm-test")) {
final String policyId = "policy";
final String repoId = "repo";
@ -398,7 +398,7 @@ public class SnapshotRetentionTaskTests extends ESTestCase {
long time = System.currentTimeMillis();
task.triggered(new SchedulerEngine.Event(SnapshotRetentionService.SLM_RETENTION_JOB_ID, time, time));
} finally {
threadPool.shutdownNow();
threadPool.awaitTermination(10, TimeUnit.SECONDS);
}
@ -413,8 +413,8 @@ public class SnapshotRetentionTaskTests extends ESTestCase {
}
private void doTestRunManuallyDuringMode(OperationMode mode) throws Exception {
try (ThreadPool threadPool = new TestThreadPool("slm-test");
ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool);
ThreadPool threadPool = new TestThreadPool("slm-test");
try (ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool);
Client noOpClient = new NoOpClient("slm-test")) {
final String policyId = "policy";
final String repoId = "repo";
@ -426,20 +426,22 @@ public class SnapshotRetentionTaskTests extends ESTestCase {
AtomicBoolean retentionWasRun = new AtomicBoolean(false);
MockSnapshotRetentionTask task = new MockSnapshotRetentionTask(noOpClient, clusterService,
new SnapshotLifecycleTaskTests.VerifyingHistoryStore(noOpClient, ZoneOffset.UTC, (historyItem) -> { }),
new SnapshotLifecycleTaskTests.VerifyingHistoryStore(noOpClient, ZoneOffset.UTC, (historyItem) -> {
}),
threadPool,
() -> {
retentionWasRun.set(true);
return Collections.emptyMap();
},
(deletionPolicyId, repo, snapId, slmStats, listener) -> { },
(deletionPolicyId, repo, snapId, slmStats, listener) -> {
},
System::nanoTime);
long time = System.currentTimeMillis();
task.triggered(new SchedulerEngine.Event(SnapshotRetentionService.SLM_RETENTION_MANUAL_JOB_ID, time, time));
assertTrue("retention should be run manually even if SLM is disabled", retentionWasRun.get());
} finally {
threadPool.shutdownNow();
threadPool.awaitTermination(10, TimeUnit.SECONDS);
}

View File

@ -49,9 +49,8 @@ import org.elasticsearch.xpack.security.audit.AuditTrailService;
import org.elasticsearch.xpack.security.audit.logfile.LoggingAuditTrail;
import org.elasticsearch.xpack.security.authc.Realms;
import org.hamcrest.Matchers;
import org.junit.Before;
import org.junit.After;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@ -143,14 +142,10 @@ public class SecurityTests extends ESTestCase {
return null;
}
@Before
public void cleanup() throws IOException {
if (threadContext != null) {
threadContext.stashContext();
threadContext.close();
@After
public void cleanup() {
threadContext = null;
}
}
public void testCustomRealmExtension() throws Exception {
Collection<Object> components = createComponents(Settings.EMPTY, new DummyExtension("myrealm"));

View File

@ -399,7 +399,7 @@ public class SecurityServerTransportInterceptorTests extends ESTestCase {
}
public void testContextRestoreResponseHandlerRestoreOriginalContext() throws Exception {
try (ThreadContext threadContext = new ThreadContext(Settings.EMPTY)) {
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
threadContext.putTransient("foo", "bar");
threadContext.putHeader("key", "value");
TransportResponseHandler<Empty> handler;
@ -444,7 +444,6 @@ public class SecurityServerTransportInterceptorTests extends ESTestCase {
assertEquals("bar", threadContext.getTransient("foo"));
assertEquals("value", threadContext.getHeader("key"));
}
}
private String[] randomRoles() {
return generateRandomStringArray(3, 10, false, true);