From 40c63ad07addc55ae2a800201079d1521cc0f1ae Mon Sep 17 00:00:00 2001 From: uboness Date: Sat, 16 Aug 2014 02:36:35 +0200 Subject: [PATCH] Fixed a request headers bug in transport client Where the configured request headers were not sent with sniffing requests (both node/info & cluster state sniffing) --- .../cluster/state/ClusterStateResponse.java | 2 +- .../elasticsearch/client/support/Headers.java | 10 ++-- .../client/transport/TransportClient.java | 2 +- .../TransportClientNodesService.java | 12 +++-- .../client/AbstractClientHeadersTests.java | 25 +++++++--- .../InternalTransportClientTests.java | 2 +- .../TransportClientHeadersTests.java | 50 +++++++++++++++++++ .../TransportClientNodesServiceTests.java | 3 +- 8 files changed, 88 insertions(+), 18 deletions(-) diff --git a/src/main/java/org/elasticsearch/action/admin/cluster/state/ClusterStateResponse.java b/src/main/java/org/elasticsearch/action/admin/cluster/state/ClusterStateResponse.java index 9ada2716e27..861a84a9e71 100644 --- a/src/main/java/org/elasticsearch/action/admin/cluster/state/ClusterStateResponse.java +++ b/src/main/java/org/elasticsearch/action/admin/cluster/state/ClusterStateResponse.java @@ -38,7 +38,7 @@ public class ClusterStateResponse extends ActionResponse { public ClusterStateResponse() { } - ClusterStateResponse(ClusterName clusterName, ClusterState clusterState) { + public ClusterStateResponse(ClusterName clusterName, ClusterState clusterState) { this.clusterName = clusterName; this.clusterState = clusterState; } diff --git a/src/main/java/org/elasticsearch/client/support/Headers.java b/src/main/java/org/elasticsearch/client/support/Headers.java index 0b12eb77219..333c834ccfc 100644 --- a/src/main/java/org/elasticsearch/client/support/Headers.java +++ b/src/main/java/org/elasticsearch/client/support/Headers.java @@ -19,10 +19,10 @@ package org.elasticsearch.client.support; -import org.elasticsearch.action.ActionRequest; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.transport.TransportMessage; /** * Client request headers picked up from the client settings. Applied to every @@ -34,7 +34,8 @@ public class Headers { public static final Headers EMPTY = new Headers(ImmutableSettings.EMPTY) { @Override - public void applyTo(ActionRequest request) { + public > M applyTo(M message) { + return message; } }; @@ -45,10 +46,11 @@ public class Headers { headers = resolveHeaders(settings); } - public void applyTo(ActionRequest request) { + public > M applyTo(M message) { for (String key : headers.names()) { - request.putHeader(key, headers.get(key)); + message.putHeader(key, headers.get(key)); } + return message; } static Settings resolveHeaders(Settings settings) { diff --git a/src/main/java/org/elasticsearch/client/transport/TransportClient.java b/src/main/java/org/elasticsearch/client/transport/TransportClient.java index 3201761e54a..92314fd5ef1 100644 --- a/src/main/java/org/elasticsearch/client/transport/TransportClient.java +++ b/src/main/java/org/elasticsearch/client/transport/TransportClient.java @@ -100,7 +100,7 @@ import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilde */ public class TransportClient extends AbstractClient { - private final Injector injector; + final Injector injector; private final Settings settings; diff --git a/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java b/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java index 52358b126e5..b6453cf6977 100644 --- a/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java +++ b/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java @@ -33,6 +33,7 @@ import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; import org.elasticsearch.action.admin.cluster.state.ClusterStateAction; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.client.Requests; +import org.elasticsearch.client.support.Headers; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.component.AbstractComponent; @@ -69,6 +70,8 @@ public class TransportClientNodesService extends AbstractComponent { private final Version minCompatibilityVersion; + private final Headers headers; + // nodes that are added to be discovered private volatile ImmutableList listedNodes = ImmutableList.of(); @@ -90,12 +93,14 @@ public class TransportClientNodesService extends AbstractComponent { private volatile boolean closed; @Inject - public TransportClientNodesService(Settings settings, ClusterName clusterName, TransportService transportService, ThreadPool threadPool, Version version) { + public TransportClientNodesService(Settings settings, ClusterName clusterName, TransportService transportService, + ThreadPool threadPool, Headers headers, Version version) { super(settings); this.clusterName = clusterName; this.transportService = transportService; this.threadPool = threadPool; this.minCompatibilityVersion = version.minimumCompatibilityVersion(); + this.headers = headers; this.nodesSamplerInterval = componentSettings.getAsTime("nodes_sampler_interval", timeValueSeconds(5)); this.pingTimeout = componentSettings.getAsTime("ping_timeout", timeValueSeconds(5)).millis(); @@ -342,7 +347,7 @@ public class TransportClientNodesService extends AbstractComponent { } try { NodesInfoResponse nodeInfo = transportService.submitRequest(listedNode, NodesInfoAction.NAME, - Requests.nodesInfoRequest("_local"), + headers.applyTo(Requests.nodesInfoRequest("_local")), TransportRequestOptions.options().withType(TransportRequestOptions.Type.STATE).withTimeout(pingTimeout), new FutureTransportResponseHandler() { @Override @@ -413,8 +418,7 @@ public class TransportClientNodesService extends AbstractComponent { } } transportService.sendRequest(listedNode, ClusterStateAction.NAME, - Requests.clusterStateRequest() - .clear().nodes(true).local(true), + headers.applyTo(Requests.clusterStateRequest().clear().nodes(true).local(true)), TransportRequestOptions.options().withType(TransportRequestOptions.Type.STATE).withTimeout(pingTimeout), new BaseTransportResponseHandler() { diff --git a/src/test/java/org/elasticsearch/client/AbstractClientHeadersTests.java b/src/test/java/org/elasticsearch/client/AbstractClientHeadersTests.java index 0b5455caa23..9a7b842a32c 100644 --- a/src/test/java/org/elasticsearch/client/AbstractClientHeadersTests.java +++ b/src/test/java/org/elasticsearch/client/AbstractClientHeadersTests.java @@ -132,6 +132,24 @@ public abstract class AbstractClientHeadersTests extends ElasticsearchTestCase { client.admin().indices().prepareFlush().execute().addListener(new AssertingActionListener(FlushAction.NAME)); } + protected static void assertHeaders(Map headers) { + assertThat(headers, notNullValue()); + assertThat(headers.size(), is(2)); + assertThat(headers.get("key1"), notNullValue()); + assertThat(headers.get("key1").toString(), equalTo("val1")); + assertThat(headers.get("key2"), notNullValue()); + assertThat(headers.get("key2").toString(), equalTo("val 2")); + } + + protected static void assertHeaders(TransportMessage message) { + assertThat(message.getHeaders(), notNullValue()); + assertThat(message.getHeaders().size(), is(2)); + assertThat(message.getHeader("key1"), notNullValue()); + assertThat(message.getHeader("key1").toString(), equalTo("val1")); + assertThat(message.getHeader("key2"), notNullValue()); + assertThat(message.getHeader("key2").toString(), equalTo("val 2")); + } + protected static class InternalException extends Exception { private final String action; @@ -165,12 +183,7 @@ public abstract class AbstractClientHeadersTests extends ElasticsearchTestCase { assertThat("expected action [" + action + "] to throw an internal exception", e, notNullValue()); assertThat(action, equalTo(((InternalException) e).action)); Map headers = ((InternalException) e).headers; - assertThat(headers, notNullValue()); - assertThat(headers.size(), is(2)); - assertThat(headers.get("key1"), notNullValue()); - assertThat(headers.get("key1").toString(), equalTo("val1")); - assertThat(headers.get("key2"), notNullValue()); - assertThat(headers.get("key2").toString(), equalTo("val 2")); + assertHeaders(headers); } public Throwable unwrap(Throwable t, Class exceptionType) { diff --git a/src/test/java/org/elasticsearch/client/transport/InternalTransportClientTests.java b/src/test/java/org/elasticsearch/client/transport/InternalTransportClientTests.java index 1d3cfae4517..527bda662ee 100644 --- a/src/test/java/org/elasticsearch/client/transport/InternalTransportClientTests.java +++ b/src/test/java/org/elasticsearch/client/transport/InternalTransportClientTests.java @@ -72,7 +72,7 @@ public class InternalTransportClientTests extends ElasticsearchTestCase { }; transportService = new TransportService(ImmutableSettings.EMPTY, transport, threadPool); transportService.start(); - transportClientNodesService = new TransportClientNodesService(ImmutableSettings.EMPTY, ClusterName.DEFAULT, transportService, threadPool, Version.CURRENT); + transportClientNodesService = new TransportClientNodesService(ImmutableSettings.EMPTY, ClusterName.DEFAULT, transportService, threadPool, Headers.EMPTY, Version.CURRENT); Map actions = new HashMap<>(); actions.put(NodesInfoAction.NAME, NodesInfoAction.INSTANCE); actions.put(TestAction.NAME, TestAction.INSTANCE); diff --git a/src/test/java/org/elasticsearch/client/transport/TransportClientHeadersTests.java b/src/test/java/org/elasticsearch/client/transport/TransportClientHeadersTests.java index fe43475e4b9..608a6493b0c 100644 --- a/src/test/java/org/elasticsearch/client/transport/TransportClientHeadersTests.java +++ b/src/test/java/org/elasticsearch/client/transport/TransportClientHeadersTests.java @@ -19,22 +19,33 @@ package org.elasticsearch.client.transport; +import org.elasticsearch.Version; import org.elasticsearch.action.GenericAction; import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoAction; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; +import org.elasticsearch.action.admin.cluster.state.ClusterStateAction; +import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.client.AbstractClientHeadersTests; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.LocalTransportAddress; +import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.*; +import org.junit.Test; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; /** * @@ -55,8 +66,32 @@ public class TransportClientHeadersTests extends AbstractClientHeadersTests { return client; } + @Test + public void testWithSniffing() throws Exception { + TransportClient client = new TransportClient(ImmutableSettings.builder() + .put("client.transport.sniff", true) + .put("cluster.name", "cluster1") + .put("client.transport.nodes_sampler_interval", "1s") + .put(TransportModule.TRANSPORT_SERVICE_TYPE_KEY, InternalTransportService.class.getName()) + .put(HEADER_SETTINGS) + .build()); + + client.addTransportAddress(address); + + InternalTransportService service = (InternalTransportService) client.injector.getInstance(TransportService.class); + + if (!service.clusterStateLatch.await(5, TimeUnit.SECONDS)) { + fail("takes way too long to get the cluster state"); + } + + assertThat(client.connectedNodes().size(), is(1)); + assertThat(client.connectedNodes().get(0).getAddress(), is((TransportAddress) address)); + } + public static class InternalTransportService extends TransportService { + CountDownLatch clusterStateLatch = new CountDownLatch(1); + @Inject public InternalTransportService(Settings settings, Transport transport, ThreadPool threadPool) { super(settings, transport, threadPool); @@ -65,9 +100,18 @@ public class TransportClientHeadersTests extends AbstractClientHeadersTests { @Override @SuppressWarnings("unchecked") public void sendRequest(DiscoveryNode node, String action, TransportRequest request, TransportRequestOptions options, TransportResponseHandler handler) { if (NodesInfoAction.NAME.equals(action)) { + assertHeaders(request); ((TransportResponseHandler) handler).handleResponse(new NodesInfoResponse(ClusterName.DEFAULT, new NodeInfo[0])); return; } + if (ClusterStateAction.NAME.equals(action)) { + assertHeaders(request); + ClusterName cluster1 = new ClusterName("cluster1"); + ((TransportResponseHandler) handler).handleResponse(new ClusterStateResponse(cluster1, state(cluster1))); + clusterStateLatch.countDown(); + return; + } + handler.handleException(new TransportException("", new InternalException(action, request))); } @@ -83,4 +127,10 @@ public class TransportClientHeadersTests extends AbstractClientHeadersTests { } } + private static ClusterState state(ClusterName clusterName) { + ClusterState.Builder builder = ClusterState.builder(clusterName); + builder.nodes(DiscoveryNodes.builder().put(new DiscoveryNode("node_id", address, Version.CURRENT))); + return builder.build(); + } + } diff --git a/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java b/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java index e96981a7646..359890c3f8f 100644 --- a/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java +++ b/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java @@ -21,6 +21,7 @@ package org.elasticsearch.client.transport; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.client.support.Headers; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.settings.ImmutableSettings; @@ -59,7 +60,7 @@ public class TransportClientNodesServiceTests extends ElasticsearchTestCase { }; transportService = new TransportService(ImmutableSettings.EMPTY, transport, threadPool); transportService.start(); - transportClientNodesService = new TransportClientNodesService(ImmutableSettings.EMPTY, ClusterName.DEFAULT, transportService, threadPool, Version.CURRENT); + transportClientNodesService = new TransportClientNodesService(ImmutableSettings.EMPTY, ClusterName.DEFAULT, transportService, threadPool, Headers.EMPTY, Version.CURRENT); nodesCount = randomIntBetween(1, 10); for (int i = 0; i < nodesCount; i++) {