Clarify semantics when defaults are overwritten.

Default headers are now read-only fallbacks if the key is actually not in
the headers map. That way we never serialize them across the wire and also never
prevent them from being  overwritten.
This commit is contained in:
Simon Willnauer 2016-01-21 15:30:25 +01:00
parent 142547271e
commit e8b9880211
3 changed files with 73 additions and 25 deletions

View File

@ -1683,7 +1683,7 @@ public abstract class AbstractClient extends AbstractComponent implements Client
@Override
protected <Request extends ActionRequest<Request>, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void doExecute(Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener) {
ThreadContext threadContext = threadPool().getThreadContext();
try (ThreadContext.StoredContext ctx = threadContext.stashContext(headers)) {
try (ThreadContext.StoredContext ctx = threadContext.stashAndMergeHeaders(headers)) {
super.doExecute(action, request, listener);
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.common.util.concurrent;
import org.apache.lucene.util.CloseableThreadLocal;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
@ -61,7 +62,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
public final class ThreadContext implements Closeable, Writeable<ThreadContext.ThreadContextStruct>{
public static final String PREFIX = "request.headers";
private final ThreadContextStruct defaultContext;
private final Map<String, String> defaultHeader;
private static final ThreadContextStruct DEFAULT_CONTEXT = new ThreadContextStruct(Collections.emptyMap());
private final ContextThreadLocal threadLocal;
/**
@ -71,15 +73,15 @@ public final class ThreadContext implements Closeable, Writeable<ThreadContext.T
public ThreadContext(Settings settings) {
Settings headers = settings.getAsSettings(PREFIX);
if (headers == null) {
this.defaultContext = new ThreadContextStruct(Collections.emptyMap());
this.defaultHeader = Collections.emptyMap();
} else {
Map<String, String> defaultHeader = new HashMap<>();
for (String key : headers.names()) {
defaultHeader.put(key, headers.get(key));
}
this.defaultContext = new ThreadContextStruct(defaultHeader);
this.defaultHeader = Collections.unmodifiableMap(defaultHeader);
}
threadLocal = new ContextThreadLocal(defaultContext);
threadLocal = new ContextThreadLocal();
}
@Override
@ -100,12 +102,14 @@ public final class ThreadContext implements Closeable, Writeable<ThreadContext.T
}
/**
* Removes the current context and resets a new context that contains a merge of the current context and the given headers. The removed context can be
* restored when closing the returned {@link StoredContext}
* Removes the current context and resets a new context that contains a merge of the current headers and the given headers. The removed context can be
* restored when closing the returned {@link StoredContext}. The merge strategy is that headers that are already existing are preserved unless they are defaults.
*/
public StoredContext stashContext(Map<String, String> headers) {
public StoredContext stashAndMergeHeaders(Map<String, String> headers) {
final ThreadContextStruct context = threadLocal.get();
threadLocal.set(context.putHeaders(headers));
Map<String, String> newHeader = new HashMap<>(headers);
newHeader.putAll(context.headers);
threadLocal.set(DEFAULT_CONTEXT.putHeaders(newHeader));
return () -> {
threadLocal.set(context);
};
@ -128,7 +132,7 @@ public final class ThreadContext implements Closeable, Writeable<ThreadContext.T
@Override
public ThreadContextStruct readFrom(StreamInput in) throws IOException {
return defaultContext.readFrom(in);
return DEFAULT_CONTEXT.readFrom(in);
}
/**
@ -143,14 +147,20 @@ public final class ThreadContext implements Closeable, Writeable<ThreadContext.T
* Returns the header for the given key or <code>null</code> if not present
*/
public String getHeader(String key) {
return threadLocal.get().headers.get(key);
String value = threadLocal.get().headers.get(key);
if (value == null) {
return defaultHeader.get(key);
}
return value;
}
/**
* Returns all of the current contexts headers
*/
public Map<String, String> getHeaders() {
return threadLocal.get().headers;
HashMap<String, String> map = new HashMap<>(defaultHeader);
map.putAll(threadLocal.get().headers);
return Collections.unmodifiableMap(map);
}
/**
@ -164,7 +174,7 @@ public final class ThreadContext implements Closeable, Writeable<ThreadContext.T
* Puts a header into the context
*/
public void putHeader(String key, String value) {
putHeader(Collections.singletonMap(key, value));
threadLocal.set(threadLocal.get().putPersistent(key, value));
}
/**
@ -220,12 +230,28 @@ public final class ThreadContext implements Closeable, Writeable<ThreadContext.T
this(headers, Collections.emptyMap());
}
private ThreadContextStruct putPersistent(String key, String value) {
Map<String, String> newHeaders = new HashMap<>(this.headers);
putSingleHeader(key, value, newHeaders);
return new ThreadContextStruct(newHeaders, transientHeaders);
}
private void putSingleHeader(String key, String value, Map<String, String> newHeaders) {
final String existingValue;
if ((existingValue = newHeaders.putIfAbsent(key, value)) != null) {
throw new IllegalArgumentException("value for key [" + key + "] already present");
}
}
private ThreadContextStruct putHeaders(Map<String, String> headers) {
if (headers.isEmpty()) {
return this;
} else {
Map<String, String> newHeaders = new HashMap<>(headers); // first add the new headers
newHeaders.putAll(this.headers); // now add the new ones - we do a merge and preserve already existing ones
final Map<String, String> newHeaders = new HashMap<>();
for (Map.Entry<String, String> entry : headers.entrySet()) {
putSingleHeader(entry.getKey(), entry.getValue(), newHeaders);
}
newHeaders.putAll(this.headers);
return new ThreadContextStruct(newHeaders, transientHeaders);
}
}
@ -269,17 +295,12 @@ public final class ThreadContext implements Closeable, Writeable<ThreadContext.T
}
private static class ContextThreadLocal extends CloseableThreadLocal<ThreadContextStruct> {
private final ThreadContextStruct defaultStruct;
private final AtomicBoolean closed = new AtomicBoolean(false);
private ContextThreadLocal(ThreadContextStruct defaultStruct) {
this.defaultStruct = defaultStruct;
}
@Override
public void set(ThreadContextStruct object) {
try {
if (object == defaultStruct) {
if (object == DEFAULT_CONTEXT) {
super.set(null);
} else {
super.set(object);
@ -299,7 +320,7 @@ public final class ThreadContext implements Closeable, Writeable<ThreadContext.T
if (threadContextStruct != null) {
return threadContextStruct;
}
return defaultStruct;
return DEFAULT_CONTEXT;
} catch (NullPointerException ex) {
/* This is odd but CloseableThreadLocal throws a NPE if it was closed but still accessed.
to get a real exception we call ensureOpen() to tell the user we are already closed.*/

View File

@ -48,7 +48,7 @@ public class ThreadContextTests extends ESTestCase {
assertEquals("1", threadContext.getHeader("default"));
}
public void testStashContextWithMerge() {
public void testStashAndMerge() {
Settings build = Settings.builder().put("request.headers.default", "1").build();
ThreadContext threadContext = new ThreadContext(build);
threadContext.putHeader("foo", "bar");
@ -59,10 +59,10 @@ public class ThreadContextTests extends ESTestCase {
HashMap<String, String> toMerge = new HashMap<>();
toMerge.put("foo", "baz");
toMerge.put("simon", "says");
try (ThreadContext.StoredContext ctx = threadContext.stashContext(toMerge)) {
try (ThreadContext.StoredContext ctx = threadContext.stashAndMergeHeaders(toMerge)) {
assertEquals("bar", threadContext.getHeader("foo"));
assertEquals("says", threadContext.getHeader("simon"));
assertEquals(new Integer(1), threadContext.getTransient("ctx.foo"));
assertNull(threadContext.getTransient("ctx.foo"));
assertEquals("1", threadContext.getHeader("default"));
}
@ -160,4 +160,31 @@ public class ThreadContextTests extends ESTestCase {
assertEquals(new Integer(1), threadContext.getTransient("ctx.foo"));
assertEquals("1", threadContext.getHeader("default"));
}
public void testCanResetDefault() {
Settings build = Settings.builder().put("request.headers.default", "1").build();
ThreadContext threadContext = new ThreadContext(build);
threadContext.putHeader("default", "2");
assertEquals("2", threadContext.getHeader("default"));
}
public void testStashAndMergeWithModifiedDefaults() {
Settings build = Settings.builder().put("request.headers.default", "1").build();
ThreadContext threadContext = new ThreadContext(build);
HashMap<String, String> toMerge = new HashMap<>();
toMerge.put("default", "2");
try (ThreadContext.StoredContext ctx = threadContext.stashAndMergeHeaders(toMerge)) {
assertEquals("2", threadContext.getHeader("default"));
}
build = Settings.builder().put("request.headers.default", "1").build();
threadContext = new ThreadContext(build);
threadContext.putHeader("default", "4");
toMerge = new HashMap<>();
toMerge.put("default", "2");
try (ThreadContext.StoredContext ctx = threadContext.stashAndMergeHeaders(toMerge)) {
assertEquals("4", threadContext.getHeader("default"));
}
}
}