From e8b9880211c3d9ee3502c9bf06afb6089327402b Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Thu, 21 Jan 2016 15:30:25 +0100 Subject: [PATCH] Clarify semantics when defaults are overwritten. Default headers are now read-only fallbacks if the key is actually not in the headers map. That way we never serialize them across the wire and also never prevent them from being overwritten. --- .../client/support/AbstractClient.java | 2 +- .../common/util/concurrent/ThreadContext.java | 63 ++++++++++++------- .../util/concurrent/ThreadContextTests.java | 33 +++++++++- 3 files changed, 73 insertions(+), 25 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/client/support/AbstractClient.java b/core/src/main/java/org/elasticsearch/client/support/AbstractClient.java index 0e18eca2042..af98267dbc5 100644 --- a/core/src/main/java/org/elasticsearch/client/support/AbstractClient.java +++ b/core/src/main/java/org/elasticsearch/client/support/AbstractClient.java @@ -1683,7 +1683,7 @@ public abstract class AbstractClient extends AbstractComponent implements Client @Override protected , Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder> void doExecute(Action action, Request request, ActionListener listener) { ThreadContext threadContext = threadPool().getThreadContext(); - try (ThreadContext.StoredContext ctx = threadContext.stashContext(headers)) { + try (ThreadContext.StoredContext ctx = threadContext.stashAndMergeHeaders(headers)) { super.doExecute(action, request, listener); } } diff --git a/core/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java b/core/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java index 8b304e7a903..0e1db5c760d 100644 --- a/core/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java +++ b/core/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java @@ -19,6 +19,7 @@ package org.elasticsearch.common.util.concurrent; import org.apache.lucene.util.CloseableThreadLocal; +import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; @@ -61,7 +62,8 @@ import java.util.concurrent.atomic.AtomicBoolean; public final class ThreadContext implements Closeable, Writeable{ public static final String PREFIX = "request.headers"; - private final ThreadContextStruct defaultContext; + private final Map defaultHeader; + private static final ThreadContextStruct DEFAULT_CONTEXT = new ThreadContextStruct(Collections.emptyMap()); private final ContextThreadLocal threadLocal; /** @@ -71,15 +73,15 @@ public final class ThreadContext implements Closeable, Writeable defaultHeader = new HashMap<>(); for (String key : headers.names()) { defaultHeader.put(key, headers.get(key)); } - this.defaultContext = new ThreadContextStruct(defaultHeader); + this.defaultHeader = Collections.unmodifiableMap(defaultHeader); } - threadLocal = new ContextThreadLocal(defaultContext); + threadLocal = new ContextThreadLocal(); } @Override @@ -100,12 +102,14 @@ public final class ThreadContext implements Closeable, Writeable headers) { + public StoredContext stashAndMergeHeaders(Map headers) { final ThreadContextStruct context = threadLocal.get(); - threadLocal.set(context.putHeaders(headers)); + Map newHeader = new HashMap<>(headers); + newHeader.putAll(context.headers); + threadLocal.set(DEFAULT_CONTEXT.putHeaders(newHeader)); return () -> { threadLocal.set(context); }; @@ -128,7 +132,7 @@ public final class ThreadContext implements Closeable, Writeablenull if not present */ public String getHeader(String key) { - return threadLocal.get().headers.get(key); + String value = threadLocal.get().headers.get(key); + if (value == null) { + return defaultHeader.get(key); + } + return value; } /** * Returns all of the current contexts headers */ public Map getHeaders() { - return threadLocal.get().headers; + HashMap map = new HashMap<>(defaultHeader); + map.putAll(threadLocal.get().headers); + return Collections.unmodifiableMap(map); } /** @@ -164,7 +174,7 @@ public final class ThreadContext implements Closeable, Writeable newHeaders = new HashMap<>(this.headers); + putSingleHeader(key, value, newHeaders); + return new ThreadContextStruct(newHeaders, transientHeaders); + } + + private void putSingleHeader(String key, String value, Map newHeaders) { + final String existingValue; + if ((existingValue = newHeaders.putIfAbsent(key, value)) != null) { + throw new IllegalArgumentException("value for key [" + key + "] already present"); + } + } + private ThreadContextStruct putHeaders(Map headers) { if (headers.isEmpty()) { return this; } else { - Map newHeaders = new HashMap<>(headers); // first add the new headers - newHeaders.putAll(this.headers); // now add the new ones - we do a merge and preserve already existing ones + final Map newHeaders = new HashMap<>(); + for (Map.Entry entry : headers.entrySet()) { + putSingleHeader(entry.getKey(), entry.getValue(), newHeaders); + } + newHeaders.putAll(this.headers); return new ThreadContextStruct(newHeaders, transientHeaders); } } @@ -269,17 +295,12 @@ public final class ThreadContext implements Closeable, Writeable { - private final ThreadContextStruct defaultStruct; private final AtomicBoolean closed = new AtomicBoolean(false); - private ContextThreadLocal(ThreadContextStruct defaultStruct) { - this.defaultStruct = defaultStruct; - } - @Override public void set(ThreadContextStruct object) { try { - if (object == defaultStruct) { + if (object == DEFAULT_CONTEXT) { super.set(null); } else { super.set(object); @@ -299,7 +320,7 @@ public final class ThreadContext implements Closeable, Writeable toMerge = new HashMap<>(); toMerge.put("foo", "baz"); toMerge.put("simon", "says"); - try (ThreadContext.StoredContext ctx = threadContext.stashContext(toMerge)) { + try (ThreadContext.StoredContext ctx = threadContext.stashAndMergeHeaders(toMerge)) { assertEquals("bar", threadContext.getHeader("foo")); assertEquals("says", threadContext.getHeader("simon")); - assertEquals(new Integer(1), threadContext.getTransient("ctx.foo")); + assertNull(threadContext.getTransient("ctx.foo")); assertEquals("1", threadContext.getHeader("default")); } @@ -160,4 +160,31 @@ public class ThreadContextTests extends ESTestCase { assertEquals(new Integer(1), threadContext.getTransient("ctx.foo")); assertEquals("1", threadContext.getHeader("default")); } + + public void testCanResetDefault() { + Settings build = Settings.builder().put("request.headers.default", "1").build(); + ThreadContext threadContext = new ThreadContext(build); + threadContext.putHeader("default", "2"); + assertEquals("2", threadContext.getHeader("default")); + } + + public void testStashAndMergeWithModifiedDefaults() { + Settings build = Settings.builder().put("request.headers.default", "1").build(); + ThreadContext threadContext = new ThreadContext(build); + HashMap toMerge = new HashMap<>(); + toMerge.put("default", "2"); + try (ThreadContext.StoredContext ctx = threadContext.stashAndMergeHeaders(toMerge)) { + assertEquals("2", threadContext.getHeader("default")); + } + + build = Settings.builder().put("request.headers.default", "1").build(); + threadContext = new ThreadContext(build); + threadContext.putHeader("default", "4"); + toMerge = new HashMap<>(); + toMerge.put("default", "2"); + try (ThreadContext.StoredContext ctx = threadContext.stashAndMergeHeaders(toMerge)) { + assertEquals("4", threadContext.getHeader("default")); + } + } + }