Send client headers from TransportClient (#30803)

This change adds a simple header to the transport client
that is present on the servers thread context that ensures
we can detect if a transport client talks to the server in a
specific request. This change also adds a header for xpack
to detect if the client has xpack installed.
This commit is contained in:
Simon Willnauer 2018-05-24 09:46:48 +02:00 committed by GitHub
parent 2c7559c575
commit 0bdfb5c5b5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 25 additions and 2 deletions

View File

@ -19,6 +19,7 @@
package org.elasticsearch.client.transport;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
@ -129,7 +130,8 @@ public abstract class TransportClient extends AbstractClient {
providedSettings = Settings.builder().put(providedSettings).put(Node.NODE_NAME_SETTING.getKey(), "_client_").build();
}
final PluginsService pluginsService = newPluginService(providedSettings, plugins);
final Settings settings = Settings.builder().put(defaultSettings).put(pluginsService.updatedSettings()).build();
final Settings settings = Settings.builder().put(defaultSettings).put(pluginsService.updatedSettings()).put(ThreadContext.PREFIX
+ "." + "transport_client", true).build();
final List<Closeable> resourcesToClose = new ArrayList<>();
final ThreadPool threadPool = new ThreadPool(settings);
resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS));

View File

@ -1438,6 +1438,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
streamIn = new NamedWriteableAwareStreamInput(streamIn, namedWriteableRegistry);
streamIn.setVersion(version);
threadPool.getThreadContext().readHeaders(streamIn);
threadPool.getThreadContext().putTransient("_remote_address", remoteAddress);
if (TransportStatus.isRequest(status)) {
handleRequest(channel, profileName, streamIn, requestId, messageLengthBytes, version, remoteAddress, status);
} else {

View File

@ -139,6 +139,8 @@ public abstract class AbstractClientHeadersTestCase extends ESTestCase {
protected static void assertHeaders(Map<String, String> headers, Map<String, String> expected) {
assertNotNull(headers);
headers = new HashMap<>(headers);
headers.remove("transport_client"); // default header on TPC
assertEquals(expected.size(), headers.size());
for (Map.Entry<String, String> expectedEntry : expected.entrySet()) {
assertEquals(headers.get(expectedEntry.getKey()), expectedEntry.getValue());
@ -146,7 +148,6 @@ public abstract class AbstractClientHeadersTestCase extends ESTestCase {
}
protected static void assertHeaders(ThreadPool pool) {
Map<String, String> headers = new HashMap<>();
Settings asSettings = HEADER_SETTINGS.getAsSettings(ThreadContext.PREFIX);
assertHeaders(pool.getThreadContext().getHeaders(),
asSettings.keySet().stream().collect(Collectors.toMap(Function.identity(), k -> asSettings.get(k))));

View File

@ -26,6 +26,7 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry.Entry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.env.Environment;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESTestCase;
@ -63,6 +64,17 @@ public class TransportClientTests extends ESTestCase {
}
}
public void testDefaultHeaderContainsPlugins() {
Settings baseSettings = Settings.builder()
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir())
.build();
try (TransportClient client = new MockTransportClient(baseSettings, Arrays.asList(MockPlugin.class))) {
ThreadContext threadContext = client.threadPool().getThreadContext();
assertEquals("true", threadContext.getHeader("transport_client"));
assertEquals("true", threadContext.getHeader("test"));
}
}
public static class MockPlugin extends Plugin {
@Override
@ -70,6 +82,11 @@ public class TransportClientTests extends ESTestCase {
return Arrays.asList(new Entry[]{ new Entry(MockNamedWriteable.class, MockNamedWriteable.NAME, MockNamedWriteable::new)});
}
@Override
public Settings additionalSettings() {
return Settings.builder().put(ThreadContext.PREFIX + "." + "test", true).build();
}
public class MockNamedWriteable implements NamedWriteable {
static final String NAME = "mockNamedWritable";

View File

@ -16,6 +16,7 @@ import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.license.DeleteLicenseAction;
@ -193,6 +194,7 @@ public class XPackClientPlugin extends Plugin implements ActionPlugin, NetworkPl
final Settings.Builder builder = Settings.builder();
builder.put(SecuritySettings.addTransportSettings(settings));
builder.put(SecuritySettings.addUserSettings(settings));
builder.put(ThreadContext.PREFIX + "." + "has_xpack", true);
return builder.build();
} else {
return Settings.EMPTY;