apply feedback from @jaymode

This adds the ability to filter clients multiple times and allows to inherit the outer context if done so.
This also adds a method to access all tasks in the threadpool in an unwrapped fashion
This commit is contained in:
Simon Willnauer 2016-01-19 21:03:44 +01:00
parent 3d0cedbabb
commit 11cf717e44
5 changed files with 85 additions and 17 deletions

View File

@ -1683,8 +1683,7 @@ public abstract class AbstractClient extends AbstractComponent implements Client
@Override
protected <Request extends ActionRequest<Request>, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void doExecute(Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener) {
ThreadContext threadContext = threadPool().getThreadContext();
try (ThreadContext.StoredContext ctx = threadContext.stashContext()) {
threadContext.putHeader(headers);
try (ThreadContext.StoredContext ctx = threadContext.stashContext(headers)) {
super.doExecute(action, request, listener);
}
}

View File

@ -24,6 +24,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
/**
* An extension to thread pool executor, allowing (in the future) to add specific additional stats to it.
@ -105,6 +106,14 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor {
}
}
/**
* Returns a stream of all pending tasks. This is similar to {@link #getQueue()} but will expose the originally submitted
* {@link Runnable} instances rather than potentially wrapped ones.
*/
public Stream<Runnable> getTasks() {
return this.getQueue().stream().map(this::unwrap);
}
@Override
public String toString() {
StringBuilder b = new StringBuilder();

View File

@ -101,6 +101,18 @@ public final class ThreadContext implements Closeable, Writeable<ThreadContext.T
};
}
/**
* Removes the current context and resets a new context that contains a merge of the current context and the given headers. The removed context can be
* restored when closing the returned {@link StoredContext}
*/
public StoredContext stashContext(Map<String, String> headers) {
final ThreadContextStruct context = threadLocal.get();
threadLocal.set(context.putHeaders(headers));
return () -> {
threadLocal.set(context);
};
}
/**
* Just like {@link #stashContext()} but no default context is set.
*/
@ -214,8 +226,8 @@ public final class ThreadContext implements Closeable, Writeable<ThreadContext.T
if (headers.isEmpty()) {
return this;
} else {
Map<String, String> newHeaders = new HashMap<>(this.headers);
newHeaders.putAll(headers);
Map<String, String> newHeaders = new HashMap<>(headers); // first add the new headers
newHeaders.putAll(this.headers); // now add the new ones - we do a merge and preserve already existing ones
return new ThreadContextStruct(newHeaders, transientHeaders);
}
}

View File

@ -335,20 +335,17 @@ public class EsExecutorsTests extends ESTestCase {
threadContext.putTransient("foo", one);
EsThreadPoolExecutor executor = EsExecutors.newFixed(getTestName(), pool, queue, EsExecutors.daemonThreadFactory("dummy"), threadContext);
try {
executor.execute(new Runnable() {
@Override
public void run() {
try {
latch.await();
} catch (InterruptedException e) {
fail();
}
assertEquals(threadContext.getHeader("foo"), "bar");
assertSame(threadContext.getTransient("foo"), one);
assertNull(threadContext.getHeader("bar"));
assertNull(threadContext.getTransient("bar"));
executed.countDown();
executor.execute(() -> {
try {
latch.await();
} catch (InterruptedException e) {
fail();
}
assertEquals(threadContext.getHeader("foo"), "bar");
assertSame(threadContext.getTransient("foo"), one);
assertNull(threadContext.getHeader("bar"));
assertNull(threadContext.getTransient("bar"));
executed.countDown();
});
threadContext.putTransient("bar", "boom");
threadContext.putHeader("bar", "boom");
@ -359,6 +356,32 @@ public class EsExecutorsTests extends ESTestCase {
latch.countDown();
terminate(executor);
}
}
public void testGetTasks() throws InterruptedException {
int pool = between(1, 10);
int queue = between(0, 100);
final CountDownLatch latch = new CountDownLatch(1);
final CountDownLatch executed = new CountDownLatch(1);
EsThreadPoolExecutor executor = EsExecutors.newFixed(getTestName(), pool, queue, EsExecutors.daemonThreadFactory("dummy"), threadContext);
try {
Runnable r = () -> {
latch.countDown();
try {
executed.await();
} catch (InterruptedException e) {
fail();
}
};
executor.execute(r);
latch.await();
executor.getTasks().forEach((runnable) -> assertSame(runnable, r));
executed.countDown();
} finally {
latch.countDown();
terminate(executor);
}
}
}

View File

@ -25,6 +25,7 @@ import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
public class ThreadContextTests extends ESTestCase {
@ -47,6 +48,30 @@ public class ThreadContextTests extends ESTestCase {
assertEquals("1", threadContext.getHeader("default"));
}
public void testStashContextWithMerge() {
Settings build = Settings.builder().put("request.headers.default", "1").build();
ThreadContext threadContext = new ThreadContext(build);
threadContext.putHeader("foo", "bar");
threadContext.putTransient("ctx.foo", new Integer(1));
assertEquals("bar", threadContext.getHeader("foo"));
assertEquals(new Integer(1), threadContext.getTransient("ctx.foo"));
assertEquals("1", threadContext.getHeader("default"));
HashMap<String, String> toMerge = new HashMap<>();
toMerge.put("foo", "baz");
toMerge.put("simon", "says");
try (ThreadContext.StoredContext ctx = threadContext.stashContext(toMerge)) {
assertEquals("bar", threadContext.getHeader("foo"));
assertEquals("says", threadContext.getHeader("simon"));
assertEquals(new Integer(1), threadContext.getTransient("ctx.foo"));
assertEquals("1", threadContext.getHeader("default"));
}
assertNull(threadContext.getHeader("simon"));
assertEquals("bar", threadContext.getHeader("foo"));
assertEquals(new Integer(1), threadContext.getTransient("ctx.foo"));
assertEquals("1", threadContext.getHeader("default"));
}
public void testStoreContext() {
Settings build = Settings.builder().put("request.headers.default", "1").build();
ThreadContext threadContext = new ThreadContext(build);