Always restore the ThreadContext for operations delayed due to a block (#23349)

The IndexShardOperationsLock has a mechanism to delay operations if there is currently a block on the lock. These
delayed operations are executed when the block is released and are executed by a different thread. When the different
thread executes the operations, the ThreadContext is that of the thread that was blocking operations. In order to
preserve the ThreadContext, we need to store it and wrap the listener when the operation is delayed.
This commit is contained in:
Jay Modi 2017-02-24 08:21:04 -05:00 committed by GitHub
parent fd509d015c
commit 5490cb52b0
2 changed files with 62 additions and 5 deletions

View File

@ -20,9 +20,11 @@ package org.elasticsearch.index.shard;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.util.concurrent.ThreadContext.StoredContext;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.Closeable;
@ -32,6 +34,7 @@ import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
public class IndexShardOperationsLock implements Closeable {
private final ShardId shardId;
@ -126,11 +129,13 @@ public class IndexShardOperationsLock implements Closeable {
if (delayedOperations == null) {
delayedOperations = new ArrayList<>();
}
final Supplier<StoredContext> contextSupplier = threadPool.getThreadContext().newRestorableContext(false);
if (executorOnDelay != null) {
delayedOperations.add(
new ThreadedActionListener<>(logger, threadPool, executorOnDelay, onAcquired, forceExecution));
new ThreadedActionListener<>(logger, threadPool, executorOnDelay,
new ContextPreservingActionListener<>(contextSupplier, onAcquired), forceExecution));
} else {
delayedOperations.add(onAcquired);
delayedOperations.add(new ContextPreservingActionListener<>(contextSupplier, onAcquired));
}
return;
}

View File

@ -18,9 +18,10 @@
*/
package org.elasticsearch.index.shard;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.inject.internal.Nullable;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
@ -35,7 +36,8 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Supplier;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
@ -146,7 +148,57 @@ public class IndexShardOperationsLockTests extends ESTestCase {
block.acquire(future, ThreadPool.Names.GENERIC, true);
assertFalse(future.isDone());
}
future.get(1, TimeUnit.MINUTES).close();
future.get(1, TimeUnit.HOURS).close();
}
/**
* Tests that the ThreadContext is restored when a operation is executed after it has been delayed due to a block
*/
public void testThreadContextPreservedIfBlock() throws ExecutionException, InterruptedException, TimeoutException {
final ThreadContext context = threadPool.getThreadContext();
final Function<ActionListener<Releasable>, Boolean> contextChecker = (listener) -> {
if ("bar".equals(context.getHeader("foo")) == false) {
listener.onFailure(new IllegalStateException("context did not have value [bar] for header [foo]. Actual value [" +
context.getHeader("foo") + "]"));
} else if ("baz".equals(context.getTransient("bar")) == false) {
listener.onFailure(new IllegalStateException("context did not have value [baz] for transient [bar]. Actual value [" +
context.getTransient("bar") + "]"));
} else {
return true;
}
return false;
};
PlainActionFuture<Releasable> future = new PlainActionFuture<Releasable>() {
@Override
public void onResponse(Releasable releasable) {
if (contextChecker.apply(this)) {
super.onResponse(releasable);
}
}
};
PlainActionFuture<Releasable> future2 = new PlainActionFuture<Releasable>() {
@Override
public void onResponse(Releasable releasable) {
if (contextChecker.apply(this)) {
super.onResponse(releasable);
}
}
};
try (Releasable releasable = blockAndWait()) {
// we preserve the thread context here so that we have a different context in the call to acquire than the context present
// when the releasable is closed
try (ThreadContext.StoredContext ignore = context.newStoredContext(false)) {
context.putHeader("foo", "bar");
context.putTransient("bar", "baz");
// test both with and without a executor name
block.acquire(future, ThreadPool.Names.GENERIC, true);
block.acquire(future2, null, true);
}
assertFalse(future.isDone());
}
future.get(1, TimeUnit.HOURS).close();
future2.get(1, TimeUnit.HOURS).close();
}
protected Releasable blockAndWait() throws InterruptedException {