From fcf4d5a38de733eade12c4af4388aa946bbe5235 Mon Sep 17 00:00:00 2001 From: javanna Date: Fri, 11 Jul 2014 12:23:20 +0200 Subject: [PATCH] Transport Client: fixed the node retry mechanism which could fail without trying all the connected nodes The RetryListener was notified twice for each single failure, which caused some additional retries, but more importantly was making the client reach the maximum number of retries (number of connected nodes) too quickly, meanwhile ongoing retries which could succeed were not completed yet. The TransportService used to throw ConnectTransportException due to throwConnectException set to true, and also notify the listener of any exception received from a separate thread through the request holder. Simplified exception handling by just removing the throwConnectException option from the TransportService, used only in the transport client. The transport client now relies solely on the request holder to notify of failures and eventually retry. Closes #6829 --- .../action/TransportActionNodeProxy.java | 11 - .../TransportClientNodesService.java | 52 +-- .../support/InternalTransportAdminClient.java | 7 +- .../support/InternalTransportClient.java | 14 +- .../InternalTransportClusterAdminClient.java | 14 +- .../InternalTransportIndicesAdminClient.java | 14 +- .../transport/TransportService.java | 22 +- .../transport/FailAndRetryMockTransport.java | 192 ++++++++++ .../InternalTransportClientTests.java | 340 ++++++++++++++++++ .../TransportClientNodesServiceTests.java | 174 +++++++++ .../transport/TransportClientRetryTests.java | 96 +++++ .../test/InternalTestCluster.java | 4 +- 12 files changed, 829 insertions(+), 111 deletions(-) create mode 100644 src/test/java/org/elasticsearch/client/transport/FailAndRetryMockTransport.java create mode 100644 src/test/java/org/elasticsearch/client/transport/InternalTransportClientTests.java create mode 100644 src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java create mode 100644 src/test/java/org/elasticsearch/client/transport/TransportClientRetryTests.java diff --git a/src/main/java/org/elasticsearch/action/TransportActionNodeProxy.java b/src/main/java/org/elasticsearch/action/TransportActionNodeProxy.java index 461ef1ab62c..925a0bc24c4 100644 --- a/src/main/java/org/elasticsearch/action/TransportActionNodeProxy.java +++ b/src/main/java/org/elasticsearch/action/TransportActionNodeProxy.java @@ -19,8 +19,6 @@ package org.elasticsearch.action; -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; @@ -31,8 +29,6 @@ import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; -import static org.elasticsearch.action.support.PlainActionFuture.newFuture; - /** * A generic proxy that will execute the given action against a specific node. */ @@ -52,13 +48,6 @@ public class TransportActionNodeProxy execute(DiscoveryNode node, Request request) throws ElasticsearchException { - PlainActionFuture future = newFuture(); - request.listenerThreaded(false); - execute(node, request, future); - return future; - } - public void execute(DiscoveryNode node, final Request request, final ActionListener listener) { ActionRequestValidationException validationException = request.validate(); if (validationException != null) { diff --git a/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java b/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java index 0fa28baa951..1a6570ffc51 100644 --- a/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java +++ b/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java @@ -111,9 +111,6 @@ public class TransportClientNodesService extends AbstractComponent { this.nodesSampler = new SimpleNodeSampler(); } this.nodesSamplerFuture = threadPool.schedule(nodesSamplerInterval, ThreadPool.Names.GENERIC, new ScheduledNodeSampler()); - - // we want the transport service to throw connect exceptions, so we can retry - transportService.throwConnectException(true); } public ImmutableList transportAddresses() { @@ -190,25 +187,6 @@ public class TransportClientNodesService extends AbstractComponent { return this; } - public T execute(NodeCallback callback) throws ElasticsearchException { - ImmutableList nodes = this.nodes; - ensureNodesAreAvailable(nodes); - int index = getNodeNumber(); - for (int i = 0; i < nodes.size(); i++) { - DiscoveryNode node = nodes.get((index + i) % nodes.size()); - try { - return callback.doWithNode(node); - } catch (ElasticsearchException e) { - if (e.unwrapCause() instanceof ConnectTransportException) { - logConnectTransportException((ConnectTransportException) e.unwrapCause()); - } else { - throw e; - } - } - } - throw new NoNodeAvailableException("None of the configured nodes were available: " + nodes); - } - public void execute(NodeListenerCallback callback, ActionListener listener) throws ElasticsearchException { ImmutableList nodes = this.nodes; ensureNodesAreAvailable(nodes); @@ -217,12 +195,9 @@ public class TransportClientNodesService extends AbstractComponent { DiscoveryNode node = nodes.get((index) % nodes.size()); try { callback.doWithNode(node, retryListener); - } catch (ElasticsearchException e) { - if (e.unwrapCause() instanceof ConnectTransportException) { - retryListener.onFailure(e); - } else { - throw e; - } + } catch (Throwable t) { + //this exception can't come from the TransportService as it doesn't throw exception at all + listener.onFailure(t); } } @@ -255,9 +230,9 @@ public class TransportClientNodesService extends AbstractComponent { } else { try { callback.doWithNode(nodes.get((index + i) % nodes.size()), this); - } catch (Throwable e1) { - // retry the next one... - onFailure(e1); + } catch(Throwable t) { + //this exception can't come from the TransportService as it doesn't throw exceptions at all + listener.onFailure(t); } } } else { @@ -299,14 +274,6 @@ public class TransportClientNodesService extends AbstractComponent { } } - private void logConnectTransportException(ConnectTransportException connectTransportException) { - if (logger.isTraceEnabled()) { - logger.trace("Could not connect to [{}] for action [{}], error [{}] [{}]", connectTransportException, connectTransportException.node(), connectTransportException.action(), connectTransportException.status().name(), connectTransportException.getMessage()); - } else { - logger.debug("Could not connect to [{}] for action [{}], error [{}] [{}]", connectTransportException.node(), connectTransportException.action(), connectTransportException.status().name(), connectTransportException.getMessage()); - } - } - abstract class NodeSampler { public void sample() { synchronized (mutex) { @@ -507,13 +474,8 @@ public class TransportClientNodesService extends AbstractComponent { } } - public static interface NodeCallback { - - T doWithNode(DiscoveryNode node) throws ElasticsearchException; - } - public static interface NodeListenerCallback { - void doWithNode(DiscoveryNode node, ActionListener listener) throws ElasticsearchException; + void doWithNode(DiscoveryNode node, ActionListener listener); } } diff --git a/src/main/java/org/elasticsearch/client/transport/support/InternalTransportAdminClient.java b/src/main/java/org/elasticsearch/client/transport/support/InternalTransportAdminClient.java index be7d8d8fb5e..d2f51160d8b 100644 --- a/src/main/java/org/elasticsearch/client/transport/support/InternalTransportAdminClient.java +++ b/src/main/java/org/elasticsearch/client/transport/support/InternalTransportAdminClient.java @@ -22,7 +22,6 @@ package org.elasticsearch.client.transport.support; import org.elasticsearch.client.AdminClient; import org.elasticsearch.client.ClusterAdminClient; import org.elasticsearch.client.IndicesAdminClient; -import org.elasticsearch.client.transport.TransportClientNodesService; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -32,17 +31,13 @@ import org.elasticsearch.common.settings.Settings; */ public class InternalTransportAdminClient extends AbstractComponent implements AdminClient { - private final TransportClientNodesService nodesService; - private final InternalTransportIndicesAdminClient indicesAdminClient; private final InternalTransportClusterAdminClient clusterAdminClient; @Inject - public InternalTransportAdminClient(Settings settings, TransportClientNodesService nodesService, - InternalTransportIndicesAdminClient indicesAdminClient, InternalTransportClusterAdminClient clusterAdminClient) { + public InternalTransportAdminClient(Settings settings, InternalTransportIndicesAdminClient indicesAdminClient, InternalTransportClusterAdminClient clusterAdminClient) { super(settings); - this.nodesService = nodesService; this.indicesAdminClient = indicesAdminClient; this.clusterAdminClient = clusterAdminClient; } diff --git a/src/main/java/org/elasticsearch/client/transport/support/InternalTransportClient.java b/src/main/java/org/elasticsearch/client/transport/support/InternalTransportClient.java index ebbc593bd63..fcf6b0d3d39 100644 --- a/src/main/java/org/elasticsearch/client/transport/support/InternalTransportClient.java +++ b/src/main/java/org/elasticsearch/client/transport/support/InternalTransportClient.java @@ -20,8 +20,8 @@ package org.elasticsearch.client.transport.support; import com.google.common.collect.ImmutableMap; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.*; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.client.AdminClient; import org.elasticsearch.client.Client; import org.elasticsearch.client.support.AbstractClient; @@ -89,13 +89,9 @@ public class InternalTransportClient extends AbstractClient { @SuppressWarnings("unchecked") @Override public > ActionFuture execute(final Action action, final Request request) { - final TransportActionNodeProxy proxy = actions.get(action); - return nodesService.execute(new TransportClientNodesService.NodeCallback>() { - @Override - public ActionFuture doWithNode(DiscoveryNode node) throws ElasticsearchException { - return proxy.execute(node, request); - } - }); + PlainActionFuture actionFuture = PlainActionFuture.newFuture(); + execute(action, request, actionFuture); + return actionFuture; } @SuppressWarnings("unchecked") @@ -104,7 +100,7 @@ public class InternalTransportClient extends AbstractClient { final TransportActionNodeProxy proxy = actions.get(action); nodesService.execute(new TransportClientNodesService.NodeListenerCallback() { @Override - public void doWithNode(DiscoveryNode node, ActionListener listener) throws ElasticsearchException { + public void doWithNode(DiscoveryNode node, ActionListener listener) { proxy.execute(node, request, listener); } }, listener); diff --git a/src/main/java/org/elasticsearch/client/transport/support/InternalTransportClusterAdminClient.java b/src/main/java/org/elasticsearch/client/transport/support/InternalTransportClusterAdminClient.java index e5afe1af5e8..0c864b3936b 100644 --- a/src/main/java/org/elasticsearch/client/transport/support/InternalTransportClusterAdminClient.java +++ b/src/main/java/org/elasticsearch/client/transport/support/InternalTransportClusterAdminClient.java @@ -20,9 +20,9 @@ package org.elasticsearch.client.transport.support; import com.google.common.collect.ImmutableMap; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.*; import org.elasticsearch.action.admin.cluster.ClusterAction; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.client.ClusterAdminClient; import org.elasticsearch.client.support.AbstractClusterAdminClient; import org.elasticsearch.client.transport.TransportClientNodesService; @@ -69,13 +69,9 @@ public class InternalTransportClusterAdminClient extends AbstractClusterAdminCli @SuppressWarnings("unchecked") @Override public > ActionFuture execute(final Action action, final Request request) { - final TransportActionNodeProxy proxy = actions.get(action); - return nodesService.execute(new TransportClientNodesService.NodeCallback>() { - @Override - public ActionFuture doWithNode(DiscoveryNode node) throws ElasticsearchException { - return proxy.execute(node, request); - } - }); + PlainActionFuture actionFuture = PlainActionFuture.newFuture(); + execute(action, request, actionFuture); + return actionFuture; } @SuppressWarnings("unchecked") @@ -84,7 +80,7 @@ public class InternalTransportClusterAdminClient extends AbstractClusterAdminCli final TransportActionNodeProxy proxy = actions.get(action); nodesService.execute(new TransportClientNodesService.NodeListenerCallback() { @Override - public void doWithNode(DiscoveryNode node, ActionListener listener) throws ElasticsearchException { + public void doWithNode(DiscoveryNode node, ActionListener listener) { proxy.execute(node, request, listener); } }, listener); diff --git a/src/main/java/org/elasticsearch/client/transport/support/InternalTransportIndicesAdminClient.java b/src/main/java/org/elasticsearch/client/transport/support/InternalTransportIndicesAdminClient.java index e9941de9a4b..ee416e29cab 100644 --- a/src/main/java/org/elasticsearch/client/transport/support/InternalTransportIndicesAdminClient.java +++ b/src/main/java/org/elasticsearch/client/transport/support/InternalTransportIndicesAdminClient.java @@ -20,9 +20,9 @@ package org.elasticsearch.client.transport.support; import com.google.common.collect.ImmutableMap; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.*; import org.elasticsearch.action.admin.indices.IndicesAction; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.client.IndicesAdminClient; import org.elasticsearch.client.support.AbstractIndicesAdminClient; import org.elasticsearch.client.transport.TransportClientNodesService; @@ -69,13 +69,9 @@ public class InternalTransportIndicesAdminClient extends AbstractIndicesAdminCli @SuppressWarnings("unchecked") @Override public > ActionFuture execute(final Action action, final Request request) { - final TransportActionNodeProxy proxy = actions.get(action); - return nodesService.execute(new TransportClientNodesService.NodeCallback>() { - @Override - public ActionFuture doWithNode(DiscoveryNode node) throws ElasticsearchException { - return proxy.execute(node, request); - } - }); + PlainActionFuture actionFuture = PlainActionFuture.newFuture(); + execute(action, request, actionFuture); + return actionFuture; } @SuppressWarnings("unchecked") @@ -84,7 +80,7 @@ public class InternalTransportIndicesAdminClient extends AbstractIndicesAdminCli final TransportActionNodeProxy proxy = actions.get(action); nodesService.execute(new TransportClientNodesService.NodeListenerCallback() { @Override - public void doWithNode(DiscoveryNode node, ActionListener listener) throws ElasticsearchException { + public void doWithNode(DiscoveryNode node, ActionListener listener) { proxy.execute(node, request, listener); } }, listener); diff --git a/src/main/java/org/elasticsearch/transport/TransportService.java b/src/main/java/org/elasticsearch/transport/TransportService.java index e66689b59b8..604af502ac0 100644 --- a/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/src/main/java/org/elasticsearch/transport/TransportService.java @@ -69,7 +69,6 @@ public class TransportService extends AbstractLifecycleComponenttrue to indicate that a {@link ConnectTransportException} should be thrown when - * sending a message (otherwise, it will be passed to the response handler). Defaults to false. - *

- *

This is useful when logic based on connect failure is needed without having to wrap the handler, - * for example, in case of retries across several nodes. - */ - public void throwConnectException(boolean throwConnectException) { - this.throwConnectException = throwConnectException; - } - public TransportFuture submitRequest(DiscoveryNode node, String action, TransportRequest request, TransportResponseHandler handler) throws TransportException { return submitRequest(node, action, request, TransportRequestOptions.EMPTY, handler); @@ -190,12 +178,12 @@ public class TransportService extends AbstractLifecycleComponent void sendRequest(final DiscoveryNode node, final String action, final TransportRequest request, - final TransportResponseHandler handler) throws TransportException { + final TransportResponseHandler handler) { sendRequest(node, action, request, TransportRequestOptions.EMPTY, handler); } public void sendRequest(final DiscoveryNode node, final String action, final TransportRequest request, - final TransportRequestOptions options, TransportResponseHandler handler) throws TransportException { + final TransportRequestOptions options, TransportResponseHandler handler) { if (node == null) { throw new ElasticsearchIllegalStateException("can't send request to a null node"); } @@ -229,12 +217,6 @@ public class TransportService extends AbstractLifecycleComponent implements Transport { + + private final Random random; + + private boolean connectMode = true; + + private TransportServiceAdapter transportServiceAdapter; + + private final AtomicInteger connectTransportExceptions = new AtomicInteger(); + private final AtomicInteger failures = new AtomicInteger(); + private final AtomicInteger successes = new AtomicInteger(); + private final Set triedNodes = new CopyOnWriteArraySet<>(); + + FailAndRetryMockTransport(Random random) { + this.random = new Random(random.nextLong()); + } + + @Override + @SuppressWarnings("unchecked") + public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException { + + //we make sure that nodes get added to the connected ones when calling addTransportAddress, by returning proper nodes info + if (connectMode) { + TransportResponseHandler transportResponseHandler = transportServiceAdapter.remove(requestId); + NodeInfo nodeInfo = new NodeInfo(Version.CURRENT, Build.CURRENT, node, null, null, null, null, null, null, null, null, null, null); + NodesInfoResponse nodesInfoResponse = new NodesInfoResponse(ClusterName.DEFAULT, new NodeInfo[]{nodeInfo}); + transportResponseHandler.handleResponse(nodesInfoResponse); + return; + } + + //once nodes are connected we'll just return errors for each sendRequest call + triedNodes.add(node); + + if (RandomInts.randomInt(random, 100) > 10) { + connectTransportExceptions.incrementAndGet(); + throw new ConnectTransportException(node, "node not available"); + } else { + if (random.nextBoolean()) { + failures.incrementAndGet(); + //throw whatever exception that is not a subclass of ConnectTransportException + throw new IllegalStateException(); + } else { + TransportResponseHandler transportResponseHandler = transportServiceAdapter.remove(requestId); + if (random.nextBoolean()) { + successes.incrementAndGet(); + transportResponseHandler.handleResponse(newResponse()); + } else { + failures.incrementAndGet(); + transportResponseHandler.handleException(new TransportException("transport exception")); + } + } + } + } + + protected abstract Response newResponse(); + + public void endConnectMode() { + this.connectMode = false; + } + + public int connectTransportExceptions() { + return connectTransportExceptions.get(); + } + + public int failures() { + return failures.get(); + } + + public int successes() { + return successes.get(); + } + + public Set triedNodes() { + return triedNodes; + } + + @Override + public void transportServiceAdapter(TransportServiceAdapter transportServiceAdapter) { + this.transportServiceAdapter = transportServiceAdapter; + } + + @Override + public BoundTransportAddress boundAddress() { + return null; + } + + @Override + public TransportAddress[] addressesFromString(String address) throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + public boolean addressSupported(Class address) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean nodeConnected(DiscoveryNode node) { + return false; + } + + @Override + public void connectToNode(DiscoveryNode node) throws ConnectTransportException { + + } + + @Override + public void connectToNodeLight(DiscoveryNode node) throws ConnectTransportException { + + } + + @Override + public void disconnectFromNode(DiscoveryNode node) { + + } + + @Override + public long serverOpen() { + return 0; + } + + @Override + public Lifecycle.State lifecycleState() { + return null; + } + + @Override + public void addLifecycleListener(LifecycleListener listener) { + throw new UnsupportedOperationException(); + } + + @Override + public void removeLifecycleListener(LifecycleListener listener) { + throw new UnsupportedOperationException(); + } + + @Override + public Transport start() throws ElasticsearchException { + return null; + } + + @Override + public Transport stop() throws ElasticsearchException { + return null; + } + + @Override + public void close() throws ElasticsearchException { + + } +} diff --git a/src/test/java/org/elasticsearch/client/transport/InternalTransportClientTests.java b/src/test/java/org/elasticsearch/client/transport/InternalTransportClientTests.java new file mode 100644 index 00000000000..b420be33354 --- /dev/null +++ b/src/test/java/org/elasticsearch/client/transport/InternalTransportClientTests.java @@ -0,0 +1,340 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.client.transport; + +import org.elasticsearch.Version; +import org.elasticsearch.action.*; +import org.elasticsearch.action.admin.cluster.ClusterAction; +import org.elasticsearch.action.admin.cluster.node.info.NodesInfoAction; +import org.elasticsearch.action.admin.indices.IndicesAction; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.ClusterAdminClient; +import org.elasticsearch.client.IndicesAdminClient; +import org.elasticsearch.client.transport.support.InternalTransportAdminClient; +import org.elasticsearch.client.transport.support.InternalTransportClient; +import org.elasticsearch.client.transport.support.InternalTransportClusterAdminClient; +import org.elasticsearch.client.transport.support.InternalTransportIndicesAdminClient; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.transport.LocalTransportAddress; +import org.elasticsearch.test.ElasticsearchTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.junit.Test; + +import java.io.Closeable; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import static org.hamcrest.CoreMatchers.*; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.hamcrest.Matchers.notNullValue; + +public class InternalTransportClientTests extends ElasticsearchTestCase { + + private static class TestIteration implements Closeable { + private final ThreadPool threadPool; + private final FailAndRetryMockTransport transport; + private final TransportService transportService; + private final TransportClientNodesService transportClientNodesService; + private final InternalTransportClient internalTransportClient; + private final int nodesCount; + + TestIteration() { + threadPool = new ThreadPool("internal-transport-client-tests"); + transport = new FailAndRetryMockTransport(getRandom()) { + @Override + protected TestResponse newResponse() { + return new TestResponse(); + } + }; + transportService = new TransportService(ImmutableSettings.EMPTY, transport, threadPool); + transportService.start(); + transportClientNodesService = new TransportClientNodesService(ImmutableSettings.EMPTY, ClusterName.DEFAULT, transportService, threadPool, Version.CURRENT); + Map actions = new HashMap<>(); + actions.put(NodesInfoAction.NAME, NodesInfoAction.INSTANCE); + actions.put(TestAction.NAME, TestAction.INSTANCE); + actions.put(IndicesAdminTestAction.NAME, IndicesAdminTestAction.INSTANCE); + actions.put(ClusterAdminTestAction.NAME, ClusterAdminTestAction.INSTANCE); + + InternalTransportIndicesAdminClient indicesAdminClient = new InternalTransportIndicesAdminClient(ImmutableSettings.EMPTY, transportClientNodesService, transportService, threadPool, actions); + InternalTransportClusterAdminClient clusterAdminClient = new InternalTransportClusterAdminClient(ImmutableSettings.EMPTY, transportClientNodesService, threadPool, transportService, actions); + InternalTransportAdminClient adminClient = new InternalTransportAdminClient(ImmutableSettings.EMPTY, indicesAdminClient, clusterAdminClient); + internalTransportClient = new InternalTransportClient(ImmutableSettings.EMPTY, threadPool, transportService, transportClientNodesService, adminClient, actions); + + nodesCount = randomIntBetween(1, 10); + for (int i = 0; i < nodesCount; i++) { + transportClientNodesService.addTransportAddresses(new LocalTransportAddress("node" + i)); + } + transport.endConnectMode(); + } + + public void close() { + threadPool.shutdown(); + try { + threadPool.awaitTermination(1, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().isInterrupted(); + } + transportService.stop(); + transportClientNodesService.close(); + internalTransportClient.close(); + } + } + + @Test + public void testListenerFailures() throws InterruptedException { + + int iters = iterations(10, 100); + for (int i = 0; i < iters; i++) { + try(final TestIteration iteration = new TestIteration()) { + final CountDownLatch latch = new CountDownLatch(1); + final AtomicInteger finalFailures = new AtomicInteger(); + final AtomicReference finalFailure = new AtomicReference<>(); + final AtomicReference response = new AtomicReference<>(); + ActionListener actionListener = new ActionListener() { + @Override + public void onResponse(TestResponse testResponse) { + response.set(testResponse); + latch.countDown(); + } + + @Override + public void onFailure(Throwable e) { + finalFailures.incrementAndGet(); + finalFailure.set(e); + latch.countDown(); + } + }; + + final AtomicInteger preSendFailures = new AtomicInteger(); + + Action action = randomFrom(Action.values()); + action.execute(iteration, actionListener); + + assertThat(latch.await(1, TimeUnit.SECONDS), equalTo(true)); + + //there can be only either one failure that causes the request to fail straightaway or success + assertThat(preSendFailures.get() + iteration.transport.failures() + iteration.transport.successes(), lessThanOrEqualTo(1)); + + if (iteration.transport.successes() == 1) { + assertThat(finalFailures.get(), equalTo(0)); + assertThat(finalFailure.get(), nullValue()); + assertThat(response.get(), notNullValue()); + } else { + assertThat(finalFailures.get(), equalTo(1)); + assertThat(finalFailure.get(), notNullValue()); + assertThat(response.get(), nullValue()); + if (preSendFailures.get() == 0 && iteration.transport.failures() == 0) { + assertThat(finalFailure.get(), instanceOf(NoNodeAvailableException.class)); + } + } + + assertThat(iteration.transport.triedNodes().size(), lessThanOrEqualTo(iteration.nodesCount)); + assertThat(iteration.transport.triedNodes().size(), equalTo(iteration.transport.connectTransportExceptions() + iteration.transport.failures() + iteration.transport.successes())); + } + } + } + + @Test + public void testSyncFailures() throws InterruptedException { + + int iters = iterations(10, 100); + for (int i = 0; i < iters; i++) { + try(final TestIteration iteration = new TestIteration()) { + TestResponse testResponse = null; + Throwable finalFailure = null; + + try { + Action action = randomFrom(Action.values()); + ActionFuture future = action.execute(iteration); + testResponse = future.actionGet(); + } catch (Throwable t) { + finalFailure = t; + } + + //there can be only either one failure that causes the request to fail straightaway or success + assertThat(iteration.transport.failures() + iteration.transport.successes(), lessThanOrEqualTo(1)); + + if (iteration.transport.successes() == 1) { + assertThat(finalFailure, nullValue()); + assertThat(testResponse, notNullValue()); + } else { + assertThat(testResponse, nullValue()); + assertThat(finalFailure, notNullValue()); + if (iteration.transport.failures() == 0) { + assertThat(finalFailure, instanceOf(NoNodeAvailableException.class)); + } + } + + assertThat(iteration.transport.triedNodes().size(), lessThanOrEqualTo(iteration.nodesCount)); + assertThat(iteration.transport.triedNodes().size(), equalTo(iteration.transport.connectTransportExceptions() + iteration.transport.failures() + iteration.transport.successes())); + } + } + } + + private static enum Action { + TEST { + @Override + ActionFuture execute(TestIteration iteration) { + return iteration.internalTransportClient.execute(TestAction.INSTANCE, new TestRequest()); + } + + @Override + void execute(TestIteration iteration, ActionListener listener) { + iteration.internalTransportClient.execute(TestAction.INSTANCE, new TestRequest(), listener); + } + }, + INDICES_ADMIN { + @Override + ActionFuture execute(TestIteration iteration) { + return iteration.internalTransportClient.admin().indices().execute(IndicesAdminTestAction.INSTANCE, new TestRequest()); + } + + @Override + void execute(TestIteration iteration, ActionListener listener) { + iteration.internalTransportClient.admin().indices().execute(IndicesAdminTestAction.INSTANCE, new TestRequest(), listener); + } + }, + CLUSTER_ADMIN { + @Override + ActionFuture execute(TestIteration iteration) { + return iteration.internalTransportClient.admin().cluster().execute(ClusterAdminTestAction.INSTANCE, new TestRequest()); + } + + @Override + void execute(TestIteration iteration, ActionListener listener) { + iteration.internalTransportClient.admin().cluster().execute(ClusterAdminTestAction.INSTANCE, new TestRequest(), listener); + } + }; + + abstract ActionFuture execute(TestIteration iteration); + + abstract void execute(TestIteration iteration, ActionListener listener); + } + + private static class TestRequest extends ActionRequest { + @Override + public ActionRequestValidationException validate() { + return null; + } + } + + private static class TestResponse extends ActionResponse { + + } + + private static class TestAction extends ClientAction { + static final String NAME = "test-action"; + static final TestAction INSTANCE = new TestAction(NAME); + + private TestAction(String name) { + super(name); + } + + @Override + public TestRequestBuilder newRequestBuilder(Client client) { + throw new UnsupportedOperationException(); + } + + @Override + public TestResponse newResponse() { + return new TestResponse(); + } + } + + private static class TestRequestBuilder extends ActionRequestBuilder { + + protected TestRequestBuilder(Client client, TestRequest request) { + super(client, request); + } + + @Override + protected void doExecute(ActionListener listener) { + throw new UnsupportedOperationException(); + } + } + + private static class IndicesAdminTestAction extends IndicesAction { + static final String NAME = "test-indices-action"; + static final IndicesAdminTestAction INSTANCE = new IndicesAdminTestAction(NAME); + + private IndicesAdminTestAction(String name) { + super(name); + } + + @Override + public IndicesAdminTestRequestBuilder newRequestBuilder(IndicesAdminClient client) { + throw new UnsupportedOperationException(); + } + + @Override + public TestResponse newResponse() { + return new TestResponse(); + } + } + + private static class IndicesAdminTestRequestBuilder extends ActionRequestBuilder { + + protected IndicesAdminTestRequestBuilder(IndicesAdminClient client, TestRequest request) { + super(client, request); + } + + @Override + protected void doExecute(ActionListener listener) { + throw new UnsupportedOperationException(); + } + } + + private static class ClusterAdminTestAction extends ClusterAction { + static final String NAME = "test-cluster-action"; + static final ClusterAdminTestAction INSTANCE = new ClusterAdminTestAction(NAME); + + private ClusterAdminTestAction(String name) { + super(name); + } + + @Override + public ClusterAdminTestRequestBuilder newRequestBuilder(ClusterAdminClient client) { + throw new UnsupportedOperationException(); + } + + @Override + public TestResponse newResponse() { + return new TestResponse(); + } + } + + private static class ClusterAdminTestRequestBuilder extends ActionRequestBuilder { + + protected ClusterAdminTestRequestBuilder(ClusterAdminClient client, TestRequest request) { + super(client, request); + } + + @Override + protected void doExecute(ActionListener listener) { + throw new UnsupportedOperationException(); + } + } +} diff --git a/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java b/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java new file mode 100644 index 00000000000..e96981a7646 --- /dev/null +++ b/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java @@ -0,0 +1,174 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.client.transport; + +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.transport.LocalTransportAddress; +import org.elasticsearch.test.ElasticsearchTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.*; +import org.junit.Test; + +import java.io.Closeable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import static org.hamcrest.CoreMatchers.*; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.hamcrest.Matchers.notNullValue; + +public class TransportClientNodesServiceTests extends ElasticsearchTestCase { + + private static class TestIteration implements Closeable { + private final ThreadPool threadPool; + private final FailAndRetryMockTransport transport; + private final TransportService transportService; + private final TransportClientNodesService transportClientNodesService; + private final int nodesCount; + + TestIteration() { + threadPool = new ThreadPool("transport-client-nodes-service-tests"); + transport = new FailAndRetryMockTransport(getRandom()) { + @Override + protected TestResponse newResponse() { + return new TestResponse(); + } + }; + transportService = new TransportService(ImmutableSettings.EMPTY, transport, threadPool); + transportService.start(); + transportClientNodesService = new TransportClientNodesService(ImmutableSettings.EMPTY, ClusterName.DEFAULT, transportService, threadPool, Version.CURRENT); + + nodesCount = randomIntBetween(1, 10); + for (int i = 0; i < nodesCount; i++) { + transportClientNodesService.addTransportAddresses(new LocalTransportAddress("node" + i)); + } + transport.endConnectMode(); + } + + public void close() { + threadPool.shutdown(); + try { + threadPool.awaitTermination(1, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().isInterrupted(); + } + transportService.stop(); + transportClientNodesService.close(); + } + } + + @Test + public void testListenerFailures() throws InterruptedException { + + int iters = iterations(10, 100); + for (int i = 0; i finalFailure = new AtomicReference<>(); + final AtomicReference response = new AtomicReference<>(); + ActionListener actionListener = new ActionListener() { + @Override + public void onResponse(TestResponse testResponse) { + response.set(testResponse); + latch.countDown(); + } + + @Override + public void onFailure(Throwable e) { + finalFailures.incrementAndGet(); + finalFailure.set(e); + latch.countDown(); + } + }; + + final AtomicInteger preSendFailures = new AtomicInteger(); + + iteration.transportClientNodesService.execute(new TransportClientNodesService.NodeListenerCallback() { + @Override + public void doWithNode(DiscoveryNode node, final ActionListener retryListener) { + if (rarely()) { + preSendFailures.incrementAndGet(); + //throw whatever exception that is not a subclass of ConnectTransportException + throw new IllegalArgumentException(); + } + + iteration.transportService.sendRequest(node, "action", new TestRequest(), new TransportRequestOptions().withTimeout(50), new BaseTransportResponseHandler() { + @Override + public TestResponse newInstance() { + return new TestResponse(); + } + + @Override + public void handleResponse(TestResponse response) { + retryListener.onResponse(response); + } + + @Override + public void handleException(TransportException exp) { + retryListener.onFailure(exp); + } + + @Override + public String executor() { + return randomBoolean() ? ThreadPool.Names.SAME : ThreadPool.Names.GENERIC; + } + }); + } + }, actionListener); + + assertThat(latch.await(1, TimeUnit.SECONDS), equalTo(true)); + + //there can be only either one failure that causes the request to fail straightaway or success + assertThat(preSendFailures.get() + iteration.transport.failures() + iteration.transport.successes(), lessThanOrEqualTo(1)); + + if (iteration.transport.successes() == 1) { + assertThat(finalFailures.get(), equalTo(0)); + assertThat(finalFailure.get(), nullValue()); + assertThat(response.get(), notNullValue()); + } else { + assertThat(finalFailures.get(), equalTo(1)); + assertThat(finalFailure.get(), notNullValue()); + assertThat(response.get(), nullValue()); + if (preSendFailures.get() == 0 && iteration.transport.failures() == 0) { + assertThat(finalFailure.get(), instanceOf(NoNodeAvailableException.class)); + } + } + + assertThat(iteration.transport.triedNodes().size(), lessThanOrEqualTo(iteration.nodesCount)); + assertThat(iteration.transport.triedNodes().size(), equalTo(iteration.transport.connectTransportExceptions() + iteration.transport.failures() + iteration.transport.successes())); + } + } + } + + private static class TestRequest extends TransportRequest { + + } + + private static class TestResponse extends TransportResponse { + + } +} diff --git a/src/test/java/org/elasticsearch/client/transport/TransportClientRetryTests.java b/src/test/java/org/elasticsearch/client/transport/TransportClientRetryTests.java new file mode 100644 index 00000000000..c1d11bb2312 --- /dev/null +++ b/src/test/java/org/elasticsearch/client/transport/TransportClientRetryTests.java @@ -0,0 +1,96 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.client.transport; + +import com.google.common.base.Predicate; +import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; +import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.action.support.PlainListenableActionFuture; +import org.elasticsearch.client.Requests; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.plugins.PluginsService; +import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.elasticsearch.test.InternalTestCluster; +import org.elasticsearch.transport.TransportService; +import org.junit.Test; + +import java.io.IOException; +import java.util.concurrent.ExecutionException; + +import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; +import static org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope; +import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; + +@ClusterScope(scope = Scope.TEST, numClientNodes = 0) +public class TransportClientRetryTests extends ElasticsearchIntegrationTest { + + @Test + public void testRetry() throws IOException, ExecutionException, InterruptedException { + + Iterable instances = internalCluster().getInstances(TransportService.class); + TransportAddress[] addresses = new TransportAddress[internalCluster().size()]; + int i = 0; + for (TransportService instance : instances) { + addresses[i++] = instance.boundAddress().publishAddress(); + } + + ImmutableSettings.Builder builder = settingsBuilder().put("client.transport.nodes_sampler_interval", "1s") + .put("name", "transport_client_retry_test") + .put("node.mode", InternalTestCluster.nodeMode()) + .put("plugins." + PluginsService.LOAD_PLUGIN_FROM_CLASSPATH, false) + .put(ClusterName.SETTING, internalCluster().getClusterName()) + .put("config.ignore_system_properties", true); + + try (TransportClient transportClient = new TransportClient(builder.build())) { + transportClient.addTransportAddresses(addresses); + assertThat(transportClient.connectedNodes().size(), equalTo(internalCluster().size())); + + int size = cluster().size(); + //kill all nodes one by one, leaving a single master/data node at the end of the loop + for (int j = 1; j < size; j++) { + internalCluster().stopRandomNode(new Predicate() { + @Override + public boolean apply(Settings input) { + return true; + } + }); + + ClusterStateRequest clusterStateRequest = Requests.clusterStateRequest().local(true); + ClusterState clusterState; + //use both variants of execute method: with and without listener + if (randomBoolean()) { + clusterState = transportClient.admin().cluster().state(clusterStateRequest).get().getState(); + } else { + PlainListenableActionFuture future = new PlainListenableActionFuture<>(clusterStateRequest.listenerThreaded(), transportClient.threadPool()); + transportClient.admin().cluster().state(clusterStateRequest, future); + clusterState = future.get().getState(); + } + assertThat(clusterState.nodes().size(), greaterThanOrEqualTo(size - j)); + assertThat(transportClient.connectedNodes().size(), greaterThanOrEqualTo(size - j)); + } + } + } +} diff --git a/src/test/java/org/elasticsearch/test/InternalTestCluster.java b/src/test/java/org/elasticsearch/test/InternalTestCluster.java index 2d5b4d7e188..9c3f6d434cf 100644 --- a/src/test/java/org/elasticsearch/test/InternalTestCluster.java +++ b/src/test/java/org/elasticsearch/test/InternalTestCluster.java @@ -147,7 +147,7 @@ public final class InternalTestCluster extends TestCluster { static final boolean DEFAULT_ENABLE_RANDOM_BENCH_NODES = true; - private static final String NODE_MODE = nodeMode(); + static final String NODE_MODE = nodeMode(); /* sorted map to make traverse order reproducible, concurrent since we do checks on it not within a sync block */ private final NavigableMap nodes = new TreeMap<>(); @@ -259,7 +259,7 @@ public final class InternalTestCluster extends TestCluster { } - private static String nodeMode() { + public static String nodeMode() { Builder builder = ImmutableSettings.builder(); if (Strings.isEmpty(System.getProperty("es.node.mode"))&& Strings.isEmpty(System.getProperty("es.node.local"))) { return "local"; // default if nothing is specified