diff --git a/src/main/java/org/elasticsearch/action/TransportActionNodeProxy.java b/src/main/java/org/elasticsearch/action/TransportActionNodeProxy.java index 461ef1ab62c..925a0bc24c4 100644 --- a/src/main/java/org/elasticsearch/action/TransportActionNodeProxy.java +++ b/src/main/java/org/elasticsearch/action/TransportActionNodeProxy.java @@ -19,8 +19,6 @@ package org.elasticsearch.action; -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; @@ -31,8 +29,6 @@ import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; -import static org.elasticsearch.action.support.PlainActionFuture.newFuture; - /** * A generic proxy that will execute the given action against a specific node. */ @@ -52,13 +48,6 @@ public class TransportActionNodeProxy execute(DiscoveryNode node, Request request) throws ElasticsearchException { - PlainActionFuture future = newFuture(); - request.listenerThreaded(false); - execute(node, request, future); - return future; - } - public void execute(DiscoveryNode node, final Request request, final ActionListener listener) { ActionRequestValidationException validationException = request.validate(); if (validationException != null) { diff --git a/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java b/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java index 0fa28baa951..1a6570ffc51 100644 --- a/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java +++ b/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java @@ -111,9 +111,6 @@ public class TransportClientNodesService extends AbstractComponent { this.nodesSampler = new SimpleNodeSampler(); } this.nodesSamplerFuture = threadPool.schedule(nodesSamplerInterval, ThreadPool.Names.GENERIC, new ScheduledNodeSampler()); - - // we want the transport service to throw connect exceptions, so we can retry - transportService.throwConnectException(true); } public ImmutableList transportAddresses() { @@ -190,25 +187,6 @@ public class TransportClientNodesService extends AbstractComponent { return this; } - public T execute(NodeCallback callback) throws ElasticsearchException { - ImmutableList nodes = this.nodes; - ensureNodesAreAvailable(nodes); - int index = getNodeNumber(); - for (int i = 0; i < nodes.size(); i++) { - DiscoveryNode node = nodes.get((index + i) % nodes.size()); - try { - return callback.doWithNode(node); - } catch (ElasticsearchException e) { - if (e.unwrapCause() instanceof ConnectTransportException) { - logConnectTransportException((ConnectTransportException) e.unwrapCause()); - } else { - throw e; - } - } - } - throw new NoNodeAvailableException("None of the configured nodes were available: " + nodes); - } - public void execute(NodeListenerCallback callback, ActionListener listener) throws ElasticsearchException { ImmutableList nodes = this.nodes; ensureNodesAreAvailable(nodes); @@ -217,12 +195,9 @@ public class TransportClientNodesService extends AbstractComponent { DiscoveryNode node = nodes.get((index) % nodes.size()); try { callback.doWithNode(node, retryListener); - } catch (ElasticsearchException e) { - if (e.unwrapCause() instanceof ConnectTransportException) { - retryListener.onFailure(e); - } else { - throw e; - } + } catch (Throwable t) { + //this exception can't come from the TransportService as it doesn't throw exception at all + listener.onFailure(t); } } @@ -255,9 +230,9 @@ public class TransportClientNodesService extends AbstractComponent { } else { try { callback.doWithNode(nodes.get((index + i) % nodes.size()), this); - } catch (Throwable e1) { - // retry the next one... - onFailure(e1); + } catch(Throwable t) { + //this exception can't come from the TransportService as it doesn't throw exceptions at all + listener.onFailure(t); } } } else { @@ -299,14 +274,6 @@ public class TransportClientNodesService extends AbstractComponent { } } - private void logConnectTransportException(ConnectTransportException connectTransportException) { - if (logger.isTraceEnabled()) { - logger.trace("Could not connect to [{}] for action [{}], error [{}] [{}]", connectTransportException, connectTransportException.node(), connectTransportException.action(), connectTransportException.status().name(), connectTransportException.getMessage()); - } else { - logger.debug("Could not connect to [{}] for action [{}], error [{}] [{}]", connectTransportException.node(), connectTransportException.action(), connectTransportException.status().name(), connectTransportException.getMessage()); - } - } - abstract class NodeSampler { public void sample() { synchronized (mutex) { @@ -507,13 +474,8 @@ public class TransportClientNodesService extends AbstractComponent { } } - public static interface NodeCallback { - - T doWithNode(DiscoveryNode node) throws ElasticsearchException; - } - public static interface NodeListenerCallback { - void doWithNode(DiscoveryNode node, ActionListener listener) throws ElasticsearchException; + void doWithNode(DiscoveryNode node, ActionListener listener); } } diff --git a/src/main/java/org/elasticsearch/client/transport/support/InternalTransportAdminClient.java b/src/main/java/org/elasticsearch/client/transport/support/InternalTransportAdminClient.java index be7d8d8fb5e..d2f51160d8b 100644 --- a/src/main/java/org/elasticsearch/client/transport/support/InternalTransportAdminClient.java +++ b/src/main/java/org/elasticsearch/client/transport/support/InternalTransportAdminClient.java @@ -22,7 +22,6 @@ package org.elasticsearch.client.transport.support; import org.elasticsearch.client.AdminClient; import org.elasticsearch.client.ClusterAdminClient; import org.elasticsearch.client.IndicesAdminClient; -import org.elasticsearch.client.transport.TransportClientNodesService; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -32,17 +31,13 @@ import org.elasticsearch.common.settings.Settings; */ public class InternalTransportAdminClient extends AbstractComponent implements AdminClient { - private final TransportClientNodesService nodesService; - private final InternalTransportIndicesAdminClient indicesAdminClient; private final InternalTransportClusterAdminClient clusterAdminClient; @Inject - public InternalTransportAdminClient(Settings settings, TransportClientNodesService nodesService, - InternalTransportIndicesAdminClient indicesAdminClient, InternalTransportClusterAdminClient clusterAdminClient) { + public InternalTransportAdminClient(Settings settings, InternalTransportIndicesAdminClient indicesAdminClient, InternalTransportClusterAdminClient clusterAdminClient) { super(settings); - this.nodesService = nodesService; this.indicesAdminClient = indicesAdminClient; this.clusterAdminClient = clusterAdminClient; } diff --git a/src/main/java/org/elasticsearch/client/transport/support/InternalTransportClient.java b/src/main/java/org/elasticsearch/client/transport/support/InternalTransportClient.java index ebbc593bd63..fcf6b0d3d39 100644 --- a/src/main/java/org/elasticsearch/client/transport/support/InternalTransportClient.java +++ b/src/main/java/org/elasticsearch/client/transport/support/InternalTransportClient.java @@ -20,8 +20,8 @@ package org.elasticsearch.client.transport.support; import com.google.common.collect.ImmutableMap; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.*; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.client.AdminClient; import org.elasticsearch.client.Client; import org.elasticsearch.client.support.AbstractClient; @@ -89,13 +89,9 @@ public class InternalTransportClient extends AbstractClient { @SuppressWarnings("unchecked") @Override public > ActionFuture execute(final Action action, final Request request) { - final TransportActionNodeProxy proxy = actions.get(action); - return nodesService.execute(new TransportClientNodesService.NodeCallback>() { - @Override - public ActionFuture doWithNode(DiscoveryNode node) throws ElasticsearchException { - return proxy.execute(node, request); - } - }); + PlainActionFuture actionFuture = PlainActionFuture.newFuture(); + execute(action, request, actionFuture); + return actionFuture; } @SuppressWarnings("unchecked") @@ -104,7 +100,7 @@ public class InternalTransportClient extends AbstractClient { final TransportActionNodeProxy proxy = actions.get(action); nodesService.execute(new TransportClientNodesService.NodeListenerCallback() { @Override - public void doWithNode(DiscoveryNode node, ActionListener listener) throws ElasticsearchException { + public void doWithNode(DiscoveryNode node, ActionListener listener) { proxy.execute(node, request, listener); } }, listener); diff --git a/src/main/java/org/elasticsearch/client/transport/support/InternalTransportClusterAdminClient.java b/src/main/java/org/elasticsearch/client/transport/support/InternalTransportClusterAdminClient.java index e5afe1af5e8..0c864b3936b 100644 --- a/src/main/java/org/elasticsearch/client/transport/support/InternalTransportClusterAdminClient.java +++ b/src/main/java/org/elasticsearch/client/transport/support/InternalTransportClusterAdminClient.java @@ -20,9 +20,9 @@ package org.elasticsearch.client.transport.support; import com.google.common.collect.ImmutableMap; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.*; import org.elasticsearch.action.admin.cluster.ClusterAction; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.client.ClusterAdminClient; import org.elasticsearch.client.support.AbstractClusterAdminClient; import org.elasticsearch.client.transport.TransportClientNodesService; @@ -69,13 +69,9 @@ public class InternalTransportClusterAdminClient extends AbstractClusterAdminCli @SuppressWarnings("unchecked") @Override public > ActionFuture execute(final Action action, final Request request) { - final TransportActionNodeProxy proxy = actions.get(action); - return nodesService.execute(new TransportClientNodesService.NodeCallback>() { - @Override - public ActionFuture doWithNode(DiscoveryNode node) throws ElasticsearchException { - return proxy.execute(node, request); - } - }); + PlainActionFuture actionFuture = PlainActionFuture.newFuture(); + execute(action, request, actionFuture); + return actionFuture; } @SuppressWarnings("unchecked") @@ -84,7 +80,7 @@ public class InternalTransportClusterAdminClient extends AbstractClusterAdminCli final TransportActionNodeProxy proxy = actions.get(action); nodesService.execute(new TransportClientNodesService.NodeListenerCallback() { @Override - public void doWithNode(DiscoveryNode node, ActionListener listener) throws ElasticsearchException { + public void doWithNode(DiscoveryNode node, ActionListener listener) { proxy.execute(node, request, listener); } }, listener); diff --git a/src/main/java/org/elasticsearch/client/transport/support/InternalTransportIndicesAdminClient.java b/src/main/java/org/elasticsearch/client/transport/support/InternalTransportIndicesAdminClient.java index e9941de9a4b..ee416e29cab 100644 --- a/src/main/java/org/elasticsearch/client/transport/support/InternalTransportIndicesAdminClient.java +++ b/src/main/java/org/elasticsearch/client/transport/support/InternalTransportIndicesAdminClient.java @@ -20,9 +20,9 @@ package org.elasticsearch.client.transport.support; import com.google.common.collect.ImmutableMap; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.*; import org.elasticsearch.action.admin.indices.IndicesAction; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.client.IndicesAdminClient; import org.elasticsearch.client.support.AbstractIndicesAdminClient; import org.elasticsearch.client.transport.TransportClientNodesService; @@ -69,13 +69,9 @@ public class InternalTransportIndicesAdminClient extends AbstractIndicesAdminCli @SuppressWarnings("unchecked") @Override public > ActionFuture execute(final Action action, final Request request) { - final TransportActionNodeProxy proxy = actions.get(action); - return nodesService.execute(new TransportClientNodesService.NodeCallback>() { - @Override - public ActionFuture doWithNode(DiscoveryNode node) throws ElasticsearchException { - return proxy.execute(node, request); - } - }); + PlainActionFuture actionFuture = PlainActionFuture.newFuture(); + execute(action, request, actionFuture); + return actionFuture; } @SuppressWarnings("unchecked") @@ -84,7 +80,7 @@ public class InternalTransportIndicesAdminClient extends AbstractIndicesAdminCli final TransportActionNodeProxy proxy = actions.get(action); nodesService.execute(new TransportClientNodesService.NodeListenerCallback() { @Override - public void doWithNode(DiscoveryNode node, ActionListener listener) throws ElasticsearchException { + public void doWithNode(DiscoveryNode node, ActionListener listener) { proxy.execute(node, request, listener); } }, listener); diff --git a/src/main/java/org/elasticsearch/transport/TransportService.java b/src/main/java/org/elasticsearch/transport/TransportService.java index e66689b59b8..604af502ac0 100644 --- a/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/src/main/java/org/elasticsearch/transport/TransportService.java @@ -69,7 +69,6 @@ public class TransportService extends AbstractLifecycleComponenttrue to indicate that a {@link ConnectTransportException} should be thrown when - * sending a message (otherwise, it will be passed to the response handler). Defaults to false. - *

- *

This is useful when logic based on connect failure is needed without having to wrap the handler, - * for example, in case of retries across several nodes. - */ - public void throwConnectException(boolean throwConnectException) { - this.throwConnectException = throwConnectException; - } - public TransportFuture submitRequest(DiscoveryNode node, String action, TransportRequest request, TransportResponseHandler handler) throws TransportException { return submitRequest(node, action, request, TransportRequestOptions.EMPTY, handler); @@ -190,12 +178,12 @@ public class TransportService extends AbstractLifecycleComponent void sendRequest(final DiscoveryNode node, final String action, final TransportRequest request, - final TransportResponseHandler handler) throws TransportException { + final TransportResponseHandler handler) { sendRequest(node, action, request, TransportRequestOptions.EMPTY, handler); } public void sendRequest(final DiscoveryNode node, final String action, final TransportRequest request, - final TransportRequestOptions options, TransportResponseHandler handler) throws TransportException { + final TransportRequestOptions options, TransportResponseHandler handler) { if (node == null) { throw new ElasticsearchIllegalStateException("can't send request to a null node"); } @@ -229,12 +217,6 @@ public class TransportService extends AbstractLifecycleComponent implements Transport { + + private final Random random; + + private boolean connectMode = true; + + private TransportServiceAdapter transportServiceAdapter; + + private final AtomicInteger connectTransportExceptions = new AtomicInteger(); + private final AtomicInteger failures = new AtomicInteger(); + private final AtomicInteger successes = new AtomicInteger(); + private final Set triedNodes = new CopyOnWriteArraySet<>(); + + FailAndRetryMockTransport(Random random) { + this.random = new Random(random.nextLong()); + } + + @Override + @SuppressWarnings("unchecked") + public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException { + + //we make sure that nodes get added to the connected ones when calling addTransportAddress, by returning proper nodes info + if (connectMode) { + TransportResponseHandler transportResponseHandler = transportServiceAdapter.remove(requestId); + NodeInfo nodeInfo = new NodeInfo(Version.CURRENT, Build.CURRENT, node, null, null, null, null, null, null, null, null, null, null); + NodesInfoResponse nodesInfoResponse = new NodesInfoResponse(ClusterName.DEFAULT, new NodeInfo[]{nodeInfo}); + transportResponseHandler.handleResponse(nodesInfoResponse); + return; + } + + //once nodes are connected we'll just return errors for each sendRequest call + triedNodes.add(node); + + if (RandomInts.randomInt(random, 100) > 10) { + connectTransportExceptions.incrementAndGet(); + throw new ConnectTransportException(node, "node not available"); + } else { + if (random.nextBoolean()) { + failures.incrementAndGet(); + //throw whatever exception that is not a subclass of ConnectTransportException + throw new IllegalStateException(); + } else { + TransportResponseHandler transportResponseHandler = transportServiceAdapter.remove(requestId); + if (random.nextBoolean()) { + successes.incrementAndGet(); + transportResponseHandler.handleResponse(newResponse()); + } else { + failures.incrementAndGet(); + transportResponseHandler.handleException(new TransportException("transport exception")); + } + } + } + } + + protected abstract Response newResponse(); + + public void endConnectMode() { + this.connectMode = false; + } + + public int connectTransportExceptions() { + return connectTransportExceptions.get(); + } + + public int failures() { + return failures.get(); + } + + public int successes() { + return successes.get(); + } + + public Set triedNodes() { + return triedNodes; + } + + @Override + public void transportServiceAdapter(TransportServiceAdapter transportServiceAdapter) { + this.transportServiceAdapter = transportServiceAdapter; + } + + @Override + public BoundTransportAddress boundAddress() { + return null; + } + + @Override + public TransportAddress[] addressesFromString(String address) throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + public boolean addressSupported(Class address) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean nodeConnected(DiscoveryNode node) { + return false; + } + + @Override + public void connectToNode(DiscoveryNode node) throws ConnectTransportException { + + } + + @Override + public void connectToNodeLight(DiscoveryNode node) throws ConnectTransportException { + + } + + @Override + public void disconnectFromNode(DiscoveryNode node) { + + } + + @Override + public long serverOpen() { + return 0; + } + + @Override + public Lifecycle.State lifecycleState() { + return null; + } + + @Override + public void addLifecycleListener(LifecycleListener listener) { + throw new UnsupportedOperationException(); + } + + @Override + public void removeLifecycleListener(LifecycleListener listener) { + throw new UnsupportedOperationException(); + } + + @Override + public Transport start() throws ElasticsearchException { + return null; + } + + @Override + public Transport stop() throws ElasticsearchException { + return null; + } + + @Override + public void close() throws ElasticsearchException { + + } +} diff --git a/src/test/java/org/elasticsearch/client/transport/InternalTransportClientTests.java b/src/test/java/org/elasticsearch/client/transport/InternalTransportClientTests.java new file mode 100644 index 00000000000..b420be33354 --- /dev/null +++ b/src/test/java/org/elasticsearch/client/transport/InternalTransportClientTests.java @@ -0,0 +1,340 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.client.transport; + +import org.elasticsearch.Version; +import org.elasticsearch.action.*; +import org.elasticsearch.action.admin.cluster.ClusterAction; +import org.elasticsearch.action.admin.cluster.node.info.NodesInfoAction; +import org.elasticsearch.action.admin.indices.IndicesAction; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.ClusterAdminClient; +import org.elasticsearch.client.IndicesAdminClient; +import org.elasticsearch.client.transport.support.InternalTransportAdminClient; +import org.elasticsearch.client.transport.support.InternalTransportClient; +import org.elasticsearch.client.transport.support.InternalTransportClusterAdminClient; +import org.elasticsearch.client.transport.support.InternalTransportIndicesAdminClient; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.transport.LocalTransportAddress; +import org.elasticsearch.test.ElasticsearchTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.junit.Test; + +import java.io.Closeable; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import static org.hamcrest.CoreMatchers.*; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.hamcrest.Matchers.notNullValue; + +public class InternalTransportClientTests extends ElasticsearchTestCase { + + private static class TestIteration implements Closeable { + private final ThreadPool threadPool; + private final FailAndRetryMockTransport transport; + private final TransportService transportService; + private final TransportClientNodesService transportClientNodesService; + private final InternalTransportClient internalTransportClient; + private final int nodesCount; + + TestIteration() { + threadPool = new ThreadPool("internal-transport-client-tests"); + transport = new FailAndRetryMockTransport(getRandom()) { + @Override + protected TestResponse newResponse() { + return new TestResponse(); + } + }; + transportService = new TransportService(ImmutableSettings.EMPTY, transport, threadPool); + transportService.start(); + transportClientNodesService = new TransportClientNodesService(ImmutableSettings.EMPTY, ClusterName.DEFAULT, transportService, threadPool, Version.CURRENT); + Map actions = new HashMap<>(); + actions.put(NodesInfoAction.NAME, NodesInfoAction.INSTANCE); + actions.put(TestAction.NAME, TestAction.INSTANCE); + actions.put(IndicesAdminTestAction.NAME, IndicesAdminTestAction.INSTANCE); + actions.put(ClusterAdminTestAction.NAME, ClusterAdminTestAction.INSTANCE); + + InternalTransportIndicesAdminClient indicesAdminClient = new InternalTransportIndicesAdminClient(ImmutableSettings.EMPTY, transportClientNodesService, transportService, threadPool, actions); + InternalTransportClusterAdminClient clusterAdminClient = new InternalTransportClusterAdminClient(ImmutableSettings.EMPTY, transportClientNodesService, threadPool, transportService, actions); + InternalTransportAdminClient adminClient = new InternalTransportAdminClient(ImmutableSettings.EMPTY, indicesAdminClient, clusterAdminClient); + internalTransportClient = new InternalTransportClient(ImmutableSettings.EMPTY, threadPool, transportService, transportClientNodesService, adminClient, actions); + + nodesCount = randomIntBetween(1, 10); + for (int i = 0; i < nodesCount; i++) { + transportClientNodesService.addTransportAddresses(new LocalTransportAddress("node" + i)); + } + transport.endConnectMode(); + } + + public void close() { + threadPool.shutdown(); + try { + threadPool.awaitTermination(1, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().isInterrupted(); + } + transportService.stop(); + transportClientNodesService.close(); + internalTransportClient.close(); + } + } + + @Test + public void testListenerFailures() throws InterruptedException { + + int iters = iterations(10, 100); + for (int i = 0; i < iters; i++) { + try(final TestIteration iteration = new TestIteration()) { + final CountDownLatch latch = new CountDownLatch(1); + final AtomicInteger finalFailures = new AtomicInteger(); + final AtomicReference finalFailure = new AtomicReference<>(); + final AtomicReference response = new AtomicReference<>(); + ActionListener actionListener = new ActionListener() { + @Override + public void onResponse(TestResponse testResponse) { + response.set(testResponse); + latch.countDown(); + } + + @Override + public void onFailure(Throwable e) { + finalFailures.incrementAndGet(); + finalFailure.set(e); + latch.countDown(); + } + }; + + final AtomicInteger preSendFailures = new AtomicInteger(); + + Action action = randomFrom(Action.values()); + action.execute(iteration, actionListener); + + assertThat(latch.await(1, TimeUnit.SECONDS), equalTo(true)); + + //there can be only either one failure that causes the request to fail straightaway or success + assertThat(preSendFailures.get() + iteration.transport.failures() + iteration.transport.successes(), lessThanOrEqualTo(1)); + + if (iteration.transport.successes() == 1) { + assertThat(finalFailures.get(), equalTo(0)); + assertThat(finalFailure.get(), nullValue()); + assertThat(response.get(), notNullValue()); + } else { + assertThat(finalFailures.get(), equalTo(1)); + assertThat(finalFailure.get(), notNullValue()); + assertThat(response.get(), nullValue()); + if (preSendFailures.get() == 0 && iteration.transport.failures() == 0) { + assertThat(finalFailure.get(), instanceOf(NoNodeAvailableException.class)); + } + } + + assertThat(iteration.transport.triedNodes().size(), lessThanOrEqualTo(iteration.nodesCount)); + assertThat(iteration.transport.triedNodes().size(), equalTo(iteration.transport.connectTransportExceptions() + iteration.transport.failures() + iteration.transport.successes())); + } + } + } + + @Test + public void testSyncFailures() throws InterruptedException { + + int iters = iterations(10, 100); + for (int i = 0; i < iters; i++) { + try(final TestIteration iteration = new TestIteration()) { + TestResponse testResponse = null; + Throwable finalFailure = null; + + try { + Action action = randomFrom(Action.values()); + ActionFuture future = action.execute(iteration); + testResponse = future.actionGet(); + } catch (Throwable t) { + finalFailure = t; + } + + //there can be only either one failure that causes the request to fail straightaway or success + assertThat(iteration.transport.failures() + iteration.transport.successes(), lessThanOrEqualTo(1)); + + if (iteration.transport.successes() == 1) { + assertThat(finalFailure, nullValue()); + assertThat(testResponse, notNullValue()); + } else { + assertThat(testResponse, nullValue()); + assertThat(finalFailure, notNullValue()); + if (iteration.transport.failures() == 0) { + assertThat(finalFailure, instanceOf(NoNodeAvailableException.class)); + } + } + + assertThat(iteration.transport.triedNodes().size(), lessThanOrEqualTo(iteration.nodesCount)); + assertThat(iteration.transport.triedNodes().size(), equalTo(iteration.transport.connectTransportExceptions() + iteration.transport.failures() + iteration.transport.successes())); + } + } + } + + private static enum Action { + TEST { + @Override + ActionFuture execute(TestIteration iteration) { + return iteration.internalTransportClient.execute(TestAction.INSTANCE, new TestRequest()); + } + + @Override + void execute(TestIteration iteration, ActionListener listener) { + iteration.internalTransportClient.execute(TestAction.INSTANCE, new TestRequest(), listener); + } + }, + INDICES_ADMIN { + @Override + ActionFuture execute(TestIteration iteration) { + return iteration.internalTransportClient.admin().indices().execute(IndicesAdminTestAction.INSTANCE, new TestRequest()); + } + + @Override + void execute(TestIteration iteration, ActionListener listener) { + iteration.internalTransportClient.admin().indices().execute(IndicesAdminTestAction.INSTANCE, new TestRequest(), listener); + } + }, + CLUSTER_ADMIN { + @Override + ActionFuture execute(TestIteration iteration) { + return iteration.internalTransportClient.admin().cluster().execute(ClusterAdminTestAction.INSTANCE, new TestRequest()); + } + + @Override + void execute(TestIteration iteration, ActionListener listener) { + iteration.internalTransportClient.admin().cluster().execute(ClusterAdminTestAction.INSTANCE, new TestRequest(), listener); + } + }; + + abstract ActionFuture execute(TestIteration iteration); + + abstract void execute(TestIteration iteration, ActionListener listener); + } + + private static class TestRequest extends ActionRequest { + @Override + public ActionRequestValidationException validate() { + return null; + } + } + + private static class TestResponse extends ActionResponse { + + } + + private static class TestAction extends ClientAction { + static final String NAME = "test-action"; + static final TestAction INSTANCE = new TestAction(NAME); + + private TestAction(String name) { + super(name); + } + + @Override + public TestRequestBuilder newRequestBuilder(Client client) { + throw new UnsupportedOperationException(); + } + + @Override + public TestResponse newResponse() { + return new TestResponse(); + } + } + + private static class TestRequestBuilder extends ActionRequestBuilder { + + protected TestRequestBuilder(Client client, TestRequest request) { + super(client, request); + } + + @Override + protected void doExecute(ActionListener listener) { + throw new UnsupportedOperationException(); + } + } + + private static class IndicesAdminTestAction extends IndicesAction { + static final String NAME = "test-indices-action"; + static final IndicesAdminTestAction INSTANCE = new IndicesAdminTestAction(NAME); + + private IndicesAdminTestAction(String name) { + super(name); + } + + @Override + public IndicesAdminTestRequestBuilder newRequestBuilder(IndicesAdminClient client) { + throw new UnsupportedOperationException(); + } + + @Override + public TestResponse newResponse() { + return new TestResponse(); + } + } + + private static class IndicesAdminTestRequestBuilder extends ActionRequestBuilder { + + protected IndicesAdminTestRequestBuilder(IndicesAdminClient client, TestRequest request) { + super(client, request); + } + + @Override + protected void doExecute(ActionListener listener) { + throw new UnsupportedOperationException(); + } + } + + private static class ClusterAdminTestAction extends ClusterAction { + static final String NAME = "test-cluster-action"; + static final ClusterAdminTestAction INSTANCE = new ClusterAdminTestAction(NAME); + + private ClusterAdminTestAction(String name) { + super(name); + } + + @Override + public ClusterAdminTestRequestBuilder newRequestBuilder(ClusterAdminClient client) { + throw new UnsupportedOperationException(); + } + + @Override + public TestResponse newResponse() { + return new TestResponse(); + } + } + + private static class ClusterAdminTestRequestBuilder extends ActionRequestBuilder { + + protected ClusterAdminTestRequestBuilder(ClusterAdminClient client, TestRequest request) { + super(client, request); + } + + @Override + protected void doExecute(ActionListener listener) { + throw new UnsupportedOperationException(); + } + } +} diff --git a/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java b/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java new file mode 100644 index 00000000000..e96981a7646 --- /dev/null +++ b/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java @@ -0,0 +1,174 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.client.transport; + +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.transport.LocalTransportAddress; +import org.elasticsearch.test.ElasticsearchTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.*; +import org.junit.Test; + +import java.io.Closeable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import static org.hamcrest.CoreMatchers.*; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.hamcrest.Matchers.notNullValue; + +public class TransportClientNodesServiceTests extends ElasticsearchTestCase { + + private static class TestIteration implements Closeable { + private final ThreadPool threadPool; + private final FailAndRetryMockTransport transport; + private final TransportService transportService; + private final TransportClientNodesService transportClientNodesService; + private final int nodesCount; + + TestIteration() { + threadPool = new ThreadPool("transport-client-nodes-service-tests"); + transport = new FailAndRetryMockTransport(getRandom()) { + @Override + protected TestResponse newResponse() { + return new TestResponse(); + } + }; + transportService = new TransportService(ImmutableSettings.EMPTY, transport, threadPool); + transportService.start(); + transportClientNodesService = new TransportClientNodesService(ImmutableSettings.EMPTY, ClusterName.DEFAULT, transportService, threadPool, Version.CURRENT); + + nodesCount = randomIntBetween(1, 10); + for (int i = 0; i < nodesCount; i++) { + transportClientNodesService.addTransportAddresses(new LocalTransportAddress("node" + i)); + } + transport.endConnectMode(); + } + + public void close() { + threadPool.shutdown(); + try { + threadPool.awaitTermination(1, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().isInterrupted(); + } + transportService.stop(); + transportClientNodesService.close(); + } + } + + @Test + public void testListenerFailures() throws InterruptedException { + + int iters = iterations(10, 100); + for (int i = 0; i finalFailure = new AtomicReference<>(); + final AtomicReference response = new AtomicReference<>(); + ActionListener actionListener = new ActionListener() { + @Override + public void onResponse(TestResponse testResponse) { + response.set(testResponse); + latch.countDown(); + } + + @Override + public void onFailure(Throwable e) { + finalFailures.incrementAndGet(); + finalFailure.set(e); + latch.countDown(); + } + }; + + final AtomicInteger preSendFailures = new AtomicInteger(); + + iteration.transportClientNodesService.execute(new TransportClientNodesService.NodeListenerCallback() { + @Override + public void doWithNode(DiscoveryNode node, final ActionListener retryListener) { + if (rarely()) { + preSendFailures.incrementAndGet(); + //throw whatever exception that is not a subclass of ConnectTransportException + throw new IllegalArgumentException(); + } + + iteration.transportService.sendRequest(node, "action", new TestRequest(), new TransportRequestOptions().withTimeout(50), new BaseTransportResponseHandler() { + @Override + public TestResponse newInstance() { + return new TestResponse(); + } + + @Override + public void handleResponse(TestResponse response) { + retryListener.onResponse(response); + } + + @Override + public void handleException(TransportException exp) { + retryListener.onFailure(exp); + } + + @Override + public String executor() { + return randomBoolean() ? ThreadPool.Names.SAME : ThreadPool.Names.GENERIC; + } + }); + } + }, actionListener); + + assertThat(latch.await(1, TimeUnit.SECONDS), equalTo(true)); + + //there can be only either one failure that causes the request to fail straightaway or success + assertThat(preSendFailures.get() + iteration.transport.failures() + iteration.transport.successes(), lessThanOrEqualTo(1)); + + if (iteration.transport.successes() == 1) { + assertThat(finalFailures.get(), equalTo(0)); + assertThat(finalFailure.get(), nullValue()); + assertThat(response.get(), notNullValue()); + } else { + assertThat(finalFailures.get(), equalTo(1)); + assertThat(finalFailure.get(), notNullValue()); + assertThat(response.get(), nullValue()); + if (preSendFailures.get() == 0 && iteration.transport.failures() == 0) { + assertThat(finalFailure.get(), instanceOf(NoNodeAvailableException.class)); + } + } + + assertThat(iteration.transport.triedNodes().size(), lessThanOrEqualTo(iteration.nodesCount)); + assertThat(iteration.transport.triedNodes().size(), equalTo(iteration.transport.connectTransportExceptions() + iteration.transport.failures() + iteration.transport.successes())); + } + } + } + + private static class TestRequest extends TransportRequest { + + } + + private static class TestResponse extends TransportResponse { + + } +} diff --git a/src/test/java/org/elasticsearch/client/transport/TransportClientRetryTests.java b/src/test/java/org/elasticsearch/client/transport/TransportClientRetryTests.java new file mode 100644 index 00000000000..c1d11bb2312 --- /dev/null +++ b/src/test/java/org/elasticsearch/client/transport/TransportClientRetryTests.java @@ -0,0 +1,96 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.client.transport; + +import com.google.common.base.Predicate; +import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; +import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.action.support.PlainListenableActionFuture; +import org.elasticsearch.client.Requests; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.plugins.PluginsService; +import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.elasticsearch.test.InternalTestCluster; +import org.elasticsearch.transport.TransportService; +import org.junit.Test; + +import java.io.IOException; +import java.util.concurrent.ExecutionException; + +import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; +import static org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope; +import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; + +@ClusterScope(scope = Scope.TEST, numClientNodes = 0) +public class TransportClientRetryTests extends ElasticsearchIntegrationTest { + + @Test + public void testRetry() throws IOException, ExecutionException, InterruptedException { + + Iterable instances = internalCluster().getInstances(TransportService.class); + TransportAddress[] addresses = new TransportAddress[internalCluster().size()]; + int i = 0; + for (TransportService instance : instances) { + addresses[i++] = instance.boundAddress().publishAddress(); + } + + ImmutableSettings.Builder builder = settingsBuilder().put("client.transport.nodes_sampler_interval", "1s") + .put("name", "transport_client_retry_test") + .put("node.mode", InternalTestCluster.nodeMode()) + .put("plugins." + PluginsService.LOAD_PLUGIN_FROM_CLASSPATH, false) + .put(ClusterName.SETTING, internalCluster().getClusterName()) + .put("config.ignore_system_properties", true); + + try (TransportClient transportClient = new TransportClient(builder.build())) { + transportClient.addTransportAddresses(addresses); + assertThat(transportClient.connectedNodes().size(), equalTo(internalCluster().size())); + + int size = cluster().size(); + //kill all nodes one by one, leaving a single master/data node at the end of the loop + for (int j = 1; j < size; j++) { + internalCluster().stopRandomNode(new Predicate() { + @Override + public boolean apply(Settings input) { + return true; + } + }); + + ClusterStateRequest clusterStateRequest = Requests.clusterStateRequest().local(true); + ClusterState clusterState; + //use both variants of execute method: with and without listener + if (randomBoolean()) { + clusterState = transportClient.admin().cluster().state(clusterStateRequest).get().getState(); + } else { + PlainListenableActionFuture future = new PlainListenableActionFuture<>(clusterStateRequest.listenerThreaded(), transportClient.threadPool()); + transportClient.admin().cluster().state(clusterStateRequest, future); + clusterState = future.get().getState(); + } + assertThat(clusterState.nodes().size(), greaterThanOrEqualTo(size - j)); + assertThat(transportClient.connectedNodes().size(), greaterThanOrEqualTo(size - j)); + } + } + } +} diff --git a/src/test/java/org/elasticsearch/test/InternalTestCluster.java b/src/test/java/org/elasticsearch/test/InternalTestCluster.java index 2d5b4d7e188..9c3f6d434cf 100644 --- a/src/test/java/org/elasticsearch/test/InternalTestCluster.java +++ b/src/test/java/org/elasticsearch/test/InternalTestCluster.java @@ -147,7 +147,7 @@ public final class InternalTestCluster extends TestCluster { static final boolean DEFAULT_ENABLE_RANDOM_BENCH_NODES = true; - private static final String NODE_MODE = nodeMode(); + static final String NODE_MODE = nodeMode(); /* sorted map to make traverse order reproducible, concurrent since we do checks on it not within a sync block */ private final NavigableMap nodes = new TreeMap<>(); @@ -259,7 +259,7 @@ public final class InternalTestCluster extends TestCluster { } - private static String nodeMode() { + public static String nodeMode() { Builder builder = ImmutableSettings.builder(); if (Strings.isEmpty(System.getProperty("es.node.mode"))&& Strings.isEmpty(System.getProperty("es.node.local"))) { return "local"; // default if nothing is specified