Transport Client: fixed the node retry mechanism which could fail without trying all the connected nodes

The RetryListener was notified twice for each single failure, which caused some additional retries, but more importantly was making the client reach the maximum number of retries (number of connected nodes) too quickly, meanwhile ongoing retries which could succeed were not completed yet.

The TransportService used to throw ConnectTransportException due to throwConnectException set to true, and also notify the listener of any exception received from a separate thread through the request holder.

Simplified exception handling by just removing the throwConnectException option from the TransportService, used only in the transport client. The transport client now relies solely on the request holder to notify of failures and eventually retry.

Closes #6829
This commit is contained in:
javanna 2014-07-11 12:23:20 +02:00 committed by Luca Cavanna
parent eecbf8a559
commit fcf4d5a38d
12 changed files with 829 additions and 111 deletions

View File

@ -19,8 +19,6 @@
package org.elasticsearch.action;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
@ -31,8 +29,6 @@ import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import static org.elasticsearch.action.support.PlainActionFuture.newFuture;
/**
* A generic proxy that will execute the given action against a specific node.
*/
@ -52,13 +48,6 @@ public class TransportActionNodeProxy<Request extends ActionRequest, Response ex
this.transportOptions = action.transportOptions(settings);
}
public ActionFuture<Response> execute(DiscoveryNode node, Request request) throws ElasticsearchException {
PlainActionFuture<Response> future = newFuture();
request.listenerThreaded(false);
execute(node, request, future);
return future;
}
public void execute(DiscoveryNode node, final Request request, final ActionListener<Response> listener) {
ActionRequestValidationException validationException = request.validate();
if (validationException != null) {

View File

@ -111,9 +111,6 @@ public class TransportClientNodesService extends AbstractComponent {
this.nodesSampler = new SimpleNodeSampler();
}
this.nodesSamplerFuture = threadPool.schedule(nodesSamplerInterval, ThreadPool.Names.GENERIC, new ScheduledNodeSampler());
// we want the transport service to throw connect exceptions, so we can retry
transportService.throwConnectException(true);
}
public ImmutableList<TransportAddress> transportAddresses() {
@ -190,25 +187,6 @@ public class TransportClientNodesService extends AbstractComponent {
return this;
}
public <T> T execute(NodeCallback<T> callback) throws ElasticsearchException {
ImmutableList<DiscoveryNode> nodes = this.nodes;
ensureNodesAreAvailable(nodes);
int index = getNodeNumber();
for (int i = 0; i < nodes.size(); i++) {
DiscoveryNode node = nodes.get((index + i) % nodes.size());
try {
return callback.doWithNode(node);
} catch (ElasticsearchException e) {
if (e.unwrapCause() instanceof ConnectTransportException) {
logConnectTransportException((ConnectTransportException) e.unwrapCause());
} else {
throw e;
}
}
}
throw new NoNodeAvailableException("None of the configured nodes were available: " + nodes);
}
public <Response> void execute(NodeListenerCallback<Response> callback, ActionListener<Response> listener) throws ElasticsearchException {
ImmutableList<DiscoveryNode> nodes = this.nodes;
ensureNodesAreAvailable(nodes);
@ -217,12 +195,9 @@ public class TransportClientNodesService extends AbstractComponent {
DiscoveryNode node = nodes.get((index) % nodes.size());
try {
callback.doWithNode(node, retryListener);
} catch (ElasticsearchException e) {
if (e.unwrapCause() instanceof ConnectTransportException) {
retryListener.onFailure(e);
} else {
throw e;
}
} catch (Throwable t) {
//this exception can't come from the TransportService as it doesn't throw exception at all
listener.onFailure(t);
}
}
@ -255,9 +230,9 @@ public class TransportClientNodesService extends AbstractComponent {
} else {
try {
callback.doWithNode(nodes.get((index + i) % nodes.size()), this);
} catch (Throwable e1) {
// retry the next one...
onFailure(e1);
} catch(Throwable t) {
//this exception can't come from the TransportService as it doesn't throw exceptions at all
listener.onFailure(t);
}
}
} else {
@ -299,14 +274,6 @@ public class TransportClientNodesService extends AbstractComponent {
}
}
private void logConnectTransportException(ConnectTransportException connectTransportException) {
if (logger.isTraceEnabled()) {
logger.trace("Could not connect to [{}] for action [{}], error [{}] [{}]", connectTransportException, connectTransportException.node(), connectTransportException.action(), connectTransportException.status().name(), connectTransportException.getMessage());
} else {
logger.debug("Could not connect to [{}] for action [{}], error [{}] [{}]", connectTransportException.node(), connectTransportException.action(), connectTransportException.status().name(), connectTransportException.getMessage());
}
}
abstract class NodeSampler {
public void sample() {
synchronized (mutex) {
@ -507,13 +474,8 @@ public class TransportClientNodesService extends AbstractComponent {
}
}
public static interface NodeCallback<T> {
T doWithNode(DiscoveryNode node) throws ElasticsearchException;
}
public static interface NodeListenerCallback<Response> {
void doWithNode(DiscoveryNode node, ActionListener<Response> listener) throws ElasticsearchException;
void doWithNode(DiscoveryNode node, ActionListener<Response> listener);
}
}

View File

@ -22,7 +22,6 @@ package org.elasticsearch.client.transport.support;
import org.elasticsearch.client.AdminClient;
import org.elasticsearch.client.ClusterAdminClient;
import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.client.transport.TransportClientNodesService;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
@ -32,17 +31,13 @@ import org.elasticsearch.common.settings.Settings;
*/
public class InternalTransportAdminClient extends AbstractComponent implements AdminClient {
private final TransportClientNodesService nodesService;
private final InternalTransportIndicesAdminClient indicesAdminClient;
private final InternalTransportClusterAdminClient clusterAdminClient;
@Inject
public InternalTransportAdminClient(Settings settings, TransportClientNodesService nodesService,
InternalTransportIndicesAdminClient indicesAdminClient, InternalTransportClusterAdminClient clusterAdminClient) {
public InternalTransportAdminClient(Settings settings, InternalTransportIndicesAdminClient indicesAdminClient, InternalTransportClusterAdminClient clusterAdminClient) {
super(settings);
this.nodesService = nodesService;
this.indicesAdminClient = indicesAdminClient;
this.clusterAdminClient = clusterAdminClient;
}

View File

@ -20,8 +20,8 @@
package org.elasticsearch.client.transport.support;
import com.google.common.collect.ImmutableMap;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.*;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.AdminClient;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.support.AbstractClient;
@ -89,13 +89,9 @@ public class InternalTransportClient extends AbstractClient {
@SuppressWarnings("unchecked")
@Override
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, Client>> ActionFuture<Response> execute(final Action<Request, Response, RequestBuilder, Client> action, final Request request) {
final TransportActionNodeProxy<Request, Response> proxy = actions.get(action);
return nodesService.execute(new TransportClientNodesService.NodeCallback<ActionFuture<Response>>() {
@Override
public ActionFuture<Response> doWithNode(DiscoveryNode node) throws ElasticsearchException {
return proxy.execute(node, request);
}
});
PlainActionFuture<Response> actionFuture = PlainActionFuture.newFuture();
execute(action, request, actionFuture);
return actionFuture;
}
@SuppressWarnings("unchecked")
@ -104,7 +100,7 @@ public class InternalTransportClient extends AbstractClient {
final TransportActionNodeProxy<Request, Response> proxy = actions.get(action);
nodesService.execute(new TransportClientNodesService.NodeListenerCallback<Response>() {
@Override
public void doWithNode(DiscoveryNode node, ActionListener<Response> listener) throws ElasticsearchException {
public void doWithNode(DiscoveryNode node, ActionListener<Response> listener) {
proxy.execute(node, request, listener);
}
}, listener);

View File

@ -20,9 +20,9 @@
package org.elasticsearch.client.transport.support;
import com.google.common.collect.ImmutableMap;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.*;
import org.elasticsearch.action.admin.cluster.ClusterAction;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.ClusterAdminClient;
import org.elasticsearch.client.support.AbstractClusterAdminClient;
import org.elasticsearch.client.transport.TransportClientNodesService;
@ -69,13 +69,9 @@ public class InternalTransportClusterAdminClient extends AbstractClusterAdminCli
@SuppressWarnings("unchecked")
@Override
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, ClusterAdminClient>> ActionFuture<Response> execute(final Action<Request, Response, RequestBuilder, ClusterAdminClient> action, final Request request) {
final TransportActionNodeProxy<Request, Response> proxy = actions.get(action);
return nodesService.execute(new TransportClientNodesService.NodeCallback<ActionFuture<Response>>() {
@Override
public ActionFuture<Response> doWithNode(DiscoveryNode node) throws ElasticsearchException {
return proxy.execute(node, request);
}
});
PlainActionFuture<Response> actionFuture = PlainActionFuture.newFuture();
execute(action, request, actionFuture);
return actionFuture;
}
@SuppressWarnings("unchecked")
@ -84,7 +80,7 @@ public class InternalTransportClusterAdminClient extends AbstractClusterAdminCli
final TransportActionNodeProxy<Request, Response> proxy = actions.get(action);
nodesService.execute(new TransportClientNodesService.NodeListenerCallback<Response>() {
@Override
public void doWithNode(DiscoveryNode node, ActionListener<Response> listener) throws ElasticsearchException {
public void doWithNode(DiscoveryNode node, ActionListener<Response> listener) {
proxy.execute(node, request, listener);
}
}, listener);

View File

@ -20,9 +20,9 @@
package org.elasticsearch.client.transport.support;
import com.google.common.collect.ImmutableMap;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.*;
import org.elasticsearch.action.admin.indices.IndicesAction;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.client.support.AbstractIndicesAdminClient;
import org.elasticsearch.client.transport.TransportClientNodesService;
@ -69,13 +69,9 @@ public class InternalTransportIndicesAdminClient extends AbstractIndicesAdminCli
@SuppressWarnings("unchecked")
@Override
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, IndicesAdminClient>> ActionFuture<Response> execute(final Action<Request, Response, RequestBuilder, IndicesAdminClient> action, final Request request) {
final TransportActionNodeProxy<Request, Response> proxy = actions.get(action);
return nodesService.execute(new TransportClientNodesService.NodeCallback<ActionFuture<Response>>() {
@Override
public ActionFuture<Response> doWithNode(DiscoveryNode node) throws ElasticsearchException {
return proxy.execute(node, request);
}
});
PlainActionFuture<Response> actionFuture = PlainActionFuture.newFuture();
execute(action, request, actionFuture);
return actionFuture;
}
@SuppressWarnings("unchecked")
@ -84,7 +80,7 @@ public class InternalTransportIndicesAdminClient extends AbstractIndicesAdminCli
final TransportActionNodeProxy<Request, Response> proxy = actions.get(action);
nodesService.execute(new TransportClientNodesService.NodeListenerCallback<Response>() {
@Override
public void doWithNode(DiscoveryNode node, ActionListener<Response> listener) throws ElasticsearchException {
public void doWithNode(DiscoveryNode node, ActionListener<Response> listener) {
proxy.execute(node, request, listener);
}
}, listener);

View File

@ -69,7 +69,6 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
}
});
private boolean throwConnectException = false;
private final TransportService.Adapter adapter = new Adapter();
public TransportService(Transport transport, ThreadPool threadPool) {
@ -166,17 +165,6 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
connectionListeners.remove(listener);
}
/**
* Set to <tt>true</tt> to indicate that a {@link ConnectTransportException} should be thrown when
* sending a message (otherwise, it will be passed to the response handler). Defaults to <tt>false</tt>.
* <p/>
* <p>This is useful when logic based on connect failure is needed without having to wrap the handler,
* for example, in case of retries across several nodes.
*/
public void throwConnectException(boolean throwConnectException) {
this.throwConnectException = throwConnectException;
}
public <T extends TransportResponse> TransportFuture<T> submitRequest(DiscoveryNode node, String action, TransportRequest request,
TransportResponseHandler<T> handler) throws TransportException {
return submitRequest(node, action, request, TransportRequestOptions.EMPTY, handler);
@ -190,12 +178,12 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
}
public <T extends TransportResponse> void sendRequest(final DiscoveryNode node, final String action, final TransportRequest request,
final TransportResponseHandler<T> handler) throws TransportException {
final TransportResponseHandler<T> handler) {
sendRequest(node, action, request, TransportRequestOptions.EMPTY, handler);
}
public <T extends TransportResponse> void sendRequest(final DiscoveryNode node, final String action, final TransportRequest request,
final TransportRequestOptions options, TransportResponseHandler<T> handler) throws TransportException {
final TransportRequestOptions options, TransportResponseHandler<T> handler) {
if (node == null) {
throw new ElasticsearchIllegalStateException("can't send request to a null node");
}
@ -229,12 +217,6 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
}
});
}
if (throwConnectException) {
if (e instanceof ConnectTransportException) {
throw (ConnectTransportException) e;
}
}
}
}

View File

@ -0,0 +1,192 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.transport;
import com.carrotsearch.randomizedtesting.generators.RandomInts;
import org.elasticsearch.Build;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.transport.*;
import java.io.IOException;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicInteger;
abstract class FailAndRetryMockTransport<Response extends TransportResponse> implements Transport {
private final Random random;
private boolean connectMode = true;
private TransportServiceAdapter transportServiceAdapter;
private final AtomicInteger connectTransportExceptions = new AtomicInteger();
private final AtomicInteger failures = new AtomicInteger();
private final AtomicInteger successes = new AtomicInteger();
private final Set<DiscoveryNode> triedNodes = new CopyOnWriteArraySet<>();
FailAndRetryMockTransport(Random random) {
this.random = new Random(random.nextLong());
}
@Override
@SuppressWarnings("unchecked")
public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {
//we make sure that nodes get added to the connected ones when calling addTransportAddress, by returning proper nodes info
if (connectMode) {
TransportResponseHandler transportResponseHandler = transportServiceAdapter.remove(requestId);
NodeInfo nodeInfo = new NodeInfo(Version.CURRENT, Build.CURRENT, node, null, null, null, null, null, null, null, null, null, null);
NodesInfoResponse nodesInfoResponse = new NodesInfoResponse(ClusterName.DEFAULT, new NodeInfo[]{nodeInfo});
transportResponseHandler.handleResponse(nodesInfoResponse);
return;
}
//once nodes are connected we'll just return errors for each sendRequest call
triedNodes.add(node);
if (RandomInts.randomInt(random, 100) > 10) {
connectTransportExceptions.incrementAndGet();
throw new ConnectTransportException(node, "node not available");
} else {
if (random.nextBoolean()) {
failures.incrementAndGet();
//throw whatever exception that is not a subclass of ConnectTransportException
throw new IllegalStateException();
} else {
TransportResponseHandler transportResponseHandler = transportServiceAdapter.remove(requestId);
if (random.nextBoolean()) {
successes.incrementAndGet();
transportResponseHandler.handleResponse(newResponse());
} else {
failures.incrementAndGet();
transportResponseHandler.handleException(new TransportException("transport exception"));
}
}
}
}
protected abstract Response newResponse();
public void endConnectMode() {
this.connectMode = false;
}
public int connectTransportExceptions() {
return connectTransportExceptions.get();
}
public int failures() {
return failures.get();
}
public int successes() {
return successes.get();
}
public Set<DiscoveryNode> triedNodes() {
return triedNodes;
}
@Override
public void transportServiceAdapter(TransportServiceAdapter transportServiceAdapter) {
this.transportServiceAdapter = transportServiceAdapter;
}
@Override
public BoundTransportAddress boundAddress() {
return null;
}
@Override
public TransportAddress[] addressesFromString(String address) throws Exception {
throw new UnsupportedOperationException();
}
@Override
public boolean addressSupported(Class<? extends TransportAddress> address) {
throw new UnsupportedOperationException();
}
@Override
public boolean nodeConnected(DiscoveryNode node) {
return false;
}
@Override
public void connectToNode(DiscoveryNode node) throws ConnectTransportException {
}
@Override
public void connectToNodeLight(DiscoveryNode node) throws ConnectTransportException {
}
@Override
public void disconnectFromNode(DiscoveryNode node) {
}
@Override
public long serverOpen() {
return 0;
}
@Override
public Lifecycle.State lifecycleState() {
return null;
}
@Override
public void addLifecycleListener(LifecycleListener listener) {
throw new UnsupportedOperationException();
}
@Override
public void removeLifecycleListener(LifecycleListener listener) {
throw new UnsupportedOperationException();
}
@Override
public Transport start() throws ElasticsearchException {
return null;
}
@Override
public Transport stop() throws ElasticsearchException {
return null;
}
@Override
public void close() throws ElasticsearchException {
}
}

View File

@ -0,0 +1,340 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.transport;
import org.elasticsearch.Version;
import org.elasticsearch.action.*;
import org.elasticsearch.action.admin.cluster.ClusterAction;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoAction;
import org.elasticsearch.action.admin.indices.IndicesAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.ClusterAdminClient;
import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.client.transport.support.InternalTransportAdminClient;
import org.elasticsearch.client.transport.support.InternalTransportClient;
import org.elasticsearch.client.transport.support.InternalTransportClusterAdminClient;
import org.elasticsearch.client.transport.support.InternalTransportIndicesAdminClient;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.junit.Test;
import java.io.Closeable;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.CoreMatchers.*;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.notNullValue;
public class InternalTransportClientTests extends ElasticsearchTestCase {
private static class TestIteration implements Closeable {
private final ThreadPool threadPool;
private final FailAndRetryMockTransport<TestResponse> transport;
private final TransportService transportService;
private final TransportClientNodesService transportClientNodesService;
private final InternalTransportClient internalTransportClient;
private final int nodesCount;
TestIteration() {
threadPool = new ThreadPool("internal-transport-client-tests");
transport = new FailAndRetryMockTransport<TestResponse>(getRandom()) {
@Override
protected TestResponse newResponse() {
return new TestResponse();
}
};
transportService = new TransportService(ImmutableSettings.EMPTY, transport, threadPool);
transportService.start();
transportClientNodesService = new TransportClientNodesService(ImmutableSettings.EMPTY, ClusterName.DEFAULT, transportService, threadPool, Version.CURRENT);
Map<String, GenericAction> actions = new HashMap<>();
actions.put(NodesInfoAction.NAME, NodesInfoAction.INSTANCE);
actions.put(TestAction.NAME, TestAction.INSTANCE);
actions.put(IndicesAdminTestAction.NAME, IndicesAdminTestAction.INSTANCE);
actions.put(ClusterAdminTestAction.NAME, ClusterAdminTestAction.INSTANCE);
InternalTransportIndicesAdminClient indicesAdminClient = new InternalTransportIndicesAdminClient(ImmutableSettings.EMPTY, transportClientNodesService, transportService, threadPool, actions);
InternalTransportClusterAdminClient clusterAdminClient = new InternalTransportClusterAdminClient(ImmutableSettings.EMPTY, transportClientNodesService, threadPool, transportService, actions);
InternalTransportAdminClient adminClient = new InternalTransportAdminClient(ImmutableSettings.EMPTY, indicesAdminClient, clusterAdminClient);
internalTransportClient = new InternalTransportClient(ImmutableSettings.EMPTY, threadPool, transportService, transportClientNodesService, adminClient, actions);
nodesCount = randomIntBetween(1, 10);
for (int i = 0; i < nodesCount; i++) {
transportClientNodesService.addTransportAddresses(new LocalTransportAddress("node" + i));
}
transport.endConnectMode();
}
public void close() {
threadPool.shutdown();
try {
threadPool.awaitTermination(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().isInterrupted();
}
transportService.stop();
transportClientNodesService.close();
internalTransportClient.close();
}
}
@Test
public void testListenerFailures() throws InterruptedException {
int iters = iterations(10, 100);
for (int i = 0; i < iters; i++) {
try(final TestIteration iteration = new TestIteration()) {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicInteger finalFailures = new AtomicInteger();
final AtomicReference<Throwable> finalFailure = new AtomicReference<>();
final AtomicReference<TestResponse> response = new AtomicReference<>();
ActionListener<TestResponse> actionListener = new ActionListener<TestResponse>() {
@Override
public void onResponse(TestResponse testResponse) {
response.set(testResponse);
latch.countDown();
}
@Override
public void onFailure(Throwable e) {
finalFailures.incrementAndGet();
finalFailure.set(e);
latch.countDown();
}
};
final AtomicInteger preSendFailures = new AtomicInteger();
Action action = randomFrom(Action.values());
action.execute(iteration, actionListener);
assertThat(latch.await(1, TimeUnit.SECONDS), equalTo(true));
//there can be only either one failure that causes the request to fail straightaway or success
assertThat(preSendFailures.get() + iteration.transport.failures() + iteration.transport.successes(), lessThanOrEqualTo(1));
if (iteration.transport.successes() == 1) {
assertThat(finalFailures.get(), equalTo(0));
assertThat(finalFailure.get(), nullValue());
assertThat(response.get(), notNullValue());
} else {
assertThat(finalFailures.get(), equalTo(1));
assertThat(finalFailure.get(), notNullValue());
assertThat(response.get(), nullValue());
if (preSendFailures.get() == 0 && iteration.transport.failures() == 0) {
assertThat(finalFailure.get(), instanceOf(NoNodeAvailableException.class));
}
}
assertThat(iteration.transport.triedNodes().size(), lessThanOrEqualTo(iteration.nodesCount));
assertThat(iteration.transport.triedNodes().size(), equalTo(iteration.transport.connectTransportExceptions() + iteration.transport.failures() + iteration.transport.successes()));
}
}
}
@Test
public void testSyncFailures() throws InterruptedException {
int iters = iterations(10, 100);
for (int i = 0; i < iters; i++) {
try(final TestIteration iteration = new TestIteration()) {
TestResponse testResponse = null;
Throwable finalFailure = null;
try {
Action action = randomFrom(Action.values());
ActionFuture<TestResponse> future = action.execute(iteration);
testResponse = future.actionGet();
} catch (Throwable t) {
finalFailure = t;
}
//there can be only either one failure that causes the request to fail straightaway or success
assertThat(iteration.transport.failures() + iteration.transport.successes(), lessThanOrEqualTo(1));
if (iteration.transport.successes() == 1) {
assertThat(finalFailure, nullValue());
assertThat(testResponse, notNullValue());
} else {
assertThat(testResponse, nullValue());
assertThat(finalFailure, notNullValue());
if (iteration.transport.failures() == 0) {
assertThat(finalFailure, instanceOf(NoNodeAvailableException.class));
}
}
assertThat(iteration.transport.triedNodes().size(), lessThanOrEqualTo(iteration.nodesCount));
assertThat(iteration.transport.triedNodes().size(), equalTo(iteration.transport.connectTransportExceptions() + iteration.transport.failures() + iteration.transport.successes()));
}
}
}
private static enum Action {
TEST {
@Override
ActionFuture<TestResponse> execute(TestIteration iteration) {
return iteration.internalTransportClient.execute(TestAction.INSTANCE, new TestRequest());
}
@Override
void execute(TestIteration iteration, ActionListener<TestResponse> listener) {
iteration.internalTransportClient.execute(TestAction.INSTANCE, new TestRequest(), listener);
}
},
INDICES_ADMIN {
@Override
ActionFuture<TestResponse> execute(TestIteration iteration) {
return iteration.internalTransportClient.admin().indices().execute(IndicesAdminTestAction.INSTANCE, new TestRequest());
}
@Override
void execute(TestIteration iteration, ActionListener<TestResponse> listener) {
iteration.internalTransportClient.admin().indices().execute(IndicesAdminTestAction.INSTANCE, new TestRequest(), listener);
}
},
CLUSTER_ADMIN {
@Override
ActionFuture<TestResponse> execute(TestIteration iteration) {
return iteration.internalTransportClient.admin().cluster().execute(ClusterAdminTestAction.INSTANCE, new TestRequest());
}
@Override
void execute(TestIteration iteration, ActionListener<TestResponse> listener) {
iteration.internalTransportClient.admin().cluster().execute(ClusterAdminTestAction.INSTANCE, new TestRequest(), listener);
}
};
abstract ActionFuture<TestResponse> execute(TestIteration iteration);
abstract void execute(TestIteration iteration, ActionListener<TestResponse> listener);
}
private static class TestRequest extends ActionRequest {
@Override
public ActionRequestValidationException validate() {
return null;
}
}
private static class TestResponse extends ActionResponse {
}
private static class TestAction extends ClientAction<TestRequest, TestResponse, TestRequestBuilder> {
static final String NAME = "test-action";
static final TestAction INSTANCE = new TestAction(NAME);
private TestAction(String name) {
super(name);
}
@Override
public TestRequestBuilder newRequestBuilder(Client client) {
throw new UnsupportedOperationException();
}
@Override
public TestResponse newResponse() {
return new TestResponse();
}
}
private static class TestRequestBuilder extends ActionRequestBuilder<TestRequest, TestResponse, TestRequestBuilder, Client> {
protected TestRequestBuilder(Client client, TestRequest request) {
super(client, request);
}
@Override
protected void doExecute(ActionListener<TestResponse> listener) {
throw new UnsupportedOperationException();
}
}
private static class IndicesAdminTestAction extends IndicesAction<TestRequest, TestResponse, IndicesAdminTestRequestBuilder> {
static final String NAME = "test-indices-action";
static final IndicesAdminTestAction INSTANCE = new IndicesAdminTestAction(NAME);
private IndicesAdminTestAction(String name) {
super(name);
}
@Override
public IndicesAdminTestRequestBuilder newRequestBuilder(IndicesAdminClient client) {
throw new UnsupportedOperationException();
}
@Override
public TestResponse newResponse() {
return new TestResponse();
}
}
private static class IndicesAdminTestRequestBuilder extends ActionRequestBuilder<TestRequest, TestResponse, IndicesAdminTestRequestBuilder, IndicesAdminClient> {
protected IndicesAdminTestRequestBuilder(IndicesAdminClient client, TestRequest request) {
super(client, request);
}
@Override
protected void doExecute(ActionListener<TestResponse> listener) {
throw new UnsupportedOperationException();
}
}
private static class ClusterAdminTestAction extends ClusterAction<TestRequest, TestResponse, ClusterAdminTestRequestBuilder> {
static final String NAME = "test-cluster-action";
static final ClusterAdminTestAction INSTANCE = new ClusterAdminTestAction(NAME);
private ClusterAdminTestAction(String name) {
super(name);
}
@Override
public ClusterAdminTestRequestBuilder newRequestBuilder(ClusterAdminClient client) {
throw new UnsupportedOperationException();
}
@Override
public TestResponse newResponse() {
return new TestResponse();
}
}
private static class ClusterAdminTestRequestBuilder extends ActionRequestBuilder<TestRequest, TestResponse, ClusterAdminTestRequestBuilder, ClusterAdminClient> {
protected ClusterAdminTestRequestBuilder(ClusterAdminClient client, TestRequest request) {
super(client, request);
}
@Override
protected void doExecute(ActionListener<TestResponse> listener) {
throw new UnsupportedOperationException();
}
}
}

View File

@ -0,0 +1,174 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.transport;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import org.junit.Test;
import java.io.Closeable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.CoreMatchers.*;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.notNullValue;
public class TransportClientNodesServiceTests extends ElasticsearchTestCase {
private static class TestIteration implements Closeable {
private final ThreadPool threadPool;
private final FailAndRetryMockTransport<TestResponse> transport;
private final TransportService transportService;
private final TransportClientNodesService transportClientNodesService;
private final int nodesCount;
TestIteration() {
threadPool = new ThreadPool("transport-client-nodes-service-tests");
transport = new FailAndRetryMockTransport<TestResponse>(getRandom()) {
@Override
protected TestResponse newResponse() {
return new TestResponse();
}
};
transportService = new TransportService(ImmutableSettings.EMPTY, transport, threadPool);
transportService.start();
transportClientNodesService = new TransportClientNodesService(ImmutableSettings.EMPTY, ClusterName.DEFAULT, transportService, threadPool, Version.CURRENT);
nodesCount = randomIntBetween(1, 10);
for (int i = 0; i < nodesCount; i++) {
transportClientNodesService.addTransportAddresses(new LocalTransportAddress("node" + i));
}
transport.endConnectMode();
}
public void close() {
threadPool.shutdown();
try {
threadPool.awaitTermination(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().isInterrupted();
}
transportService.stop();
transportClientNodesService.close();
}
}
@Test
public void testListenerFailures() throws InterruptedException {
int iters = iterations(10, 100);
for (int i = 0; i <iters; i++) {
try(final TestIteration iteration = new TestIteration()) {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicInteger finalFailures = new AtomicInteger();
final AtomicReference<Throwable> finalFailure = new AtomicReference<>();
final AtomicReference<TestResponse> response = new AtomicReference<>();
ActionListener<TestResponse> actionListener = new ActionListener<TestResponse>() {
@Override
public void onResponse(TestResponse testResponse) {
response.set(testResponse);
latch.countDown();
}
@Override
public void onFailure(Throwable e) {
finalFailures.incrementAndGet();
finalFailure.set(e);
latch.countDown();
}
};
final AtomicInteger preSendFailures = new AtomicInteger();
iteration.transportClientNodesService.execute(new TransportClientNodesService.NodeListenerCallback<TestResponse>() {
@Override
public void doWithNode(DiscoveryNode node, final ActionListener<TestResponse> retryListener) {
if (rarely()) {
preSendFailures.incrementAndGet();
//throw whatever exception that is not a subclass of ConnectTransportException
throw new IllegalArgumentException();
}
iteration.transportService.sendRequest(node, "action", new TestRequest(), new TransportRequestOptions().withTimeout(50), new BaseTransportResponseHandler<TestResponse>() {
@Override
public TestResponse newInstance() {
return new TestResponse();
}
@Override
public void handleResponse(TestResponse response) {
retryListener.onResponse(response);
}
@Override
public void handleException(TransportException exp) {
retryListener.onFailure(exp);
}
@Override
public String executor() {
return randomBoolean() ? ThreadPool.Names.SAME : ThreadPool.Names.GENERIC;
}
});
}
}, actionListener);
assertThat(latch.await(1, TimeUnit.SECONDS), equalTo(true));
//there can be only either one failure that causes the request to fail straightaway or success
assertThat(preSendFailures.get() + iteration.transport.failures() + iteration.transport.successes(), lessThanOrEqualTo(1));
if (iteration.transport.successes() == 1) {
assertThat(finalFailures.get(), equalTo(0));
assertThat(finalFailure.get(), nullValue());
assertThat(response.get(), notNullValue());
} else {
assertThat(finalFailures.get(), equalTo(1));
assertThat(finalFailure.get(), notNullValue());
assertThat(response.get(), nullValue());
if (preSendFailures.get() == 0 && iteration.transport.failures() == 0) {
assertThat(finalFailure.get(), instanceOf(NoNodeAvailableException.class));
}
}
assertThat(iteration.transport.triedNodes().size(), lessThanOrEqualTo(iteration.nodesCount));
assertThat(iteration.transport.triedNodes().size(), equalTo(iteration.transport.connectTransportExceptions() + iteration.transport.failures() + iteration.transport.successes()));
}
}
}
private static class TestRequest extends TransportRequest {
}
private static class TestResponse extends TransportResponse {
}
}

View File

@ -0,0 +1,96 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.transport;
import com.google.common.base.Predicate;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.support.PlainListenableActionFuture;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.transport.TransportService;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
@ClusterScope(scope = Scope.TEST, numClientNodes = 0)
public class TransportClientRetryTests extends ElasticsearchIntegrationTest {
@Test
public void testRetry() throws IOException, ExecutionException, InterruptedException {
Iterable<TransportService> instances = internalCluster().getInstances(TransportService.class);
TransportAddress[] addresses = new TransportAddress[internalCluster().size()];
int i = 0;
for (TransportService instance : instances) {
addresses[i++] = instance.boundAddress().publishAddress();
}
ImmutableSettings.Builder builder = settingsBuilder().put("client.transport.nodes_sampler_interval", "1s")
.put("name", "transport_client_retry_test")
.put("node.mode", InternalTestCluster.nodeMode())
.put("plugins." + PluginsService.LOAD_PLUGIN_FROM_CLASSPATH, false)
.put(ClusterName.SETTING, internalCluster().getClusterName())
.put("config.ignore_system_properties", true);
try (TransportClient transportClient = new TransportClient(builder.build())) {
transportClient.addTransportAddresses(addresses);
assertThat(transportClient.connectedNodes().size(), equalTo(internalCluster().size()));
int size = cluster().size();
//kill all nodes one by one, leaving a single master/data node at the end of the loop
for (int j = 1; j < size; j++) {
internalCluster().stopRandomNode(new Predicate<Settings>() {
@Override
public boolean apply(Settings input) {
return true;
}
});
ClusterStateRequest clusterStateRequest = Requests.clusterStateRequest().local(true);
ClusterState clusterState;
//use both variants of execute method: with and without listener
if (randomBoolean()) {
clusterState = transportClient.admin().cluster().state(clusterStateRequest).get().getState();
} else {
PlainListenableActionFuture<ClusterStateResponse> future = new PlainListenableActionFuture<>(clusterStateRequest.listenerThreaded(), transportClient.threadPool());
transportClient.admin().cluster().state(clusterStateRequest, future);
clusterState = future.get().getState();
}
assertThat(clusterState.nodes().size(), greaterThanOrEqualTo(size - j));
assertThat(transportClient.connectedNodes().size(), greaterThanOrEqualTo(size - j));
}
}
}
}

View File

@ -147,7 +147,7 @@ public final class InternalTestCluster extends TestCluster {
static final boolean DEFAULT_ENABLE_RANDOM_BENCH_NODES = true;
private static final String NODE_MODE = nodeMode();
static final String NODE_MODE = nodeMode();
/* sorted map to make traverse order reproducible, concurrent since we do checks on it not within a sync block */
private final NavigableMap<String, NodeAndClient> nodes = new TreeMap<>();
@ -259,7 +259,7 @@ public final class InternalTestCluster extends TestCluster {
}
private static String nodeMode() {
public static String nodeMode() {
Builder builder = ImmutableSettings.builder();
if (Strings.isEmpty(System.getProperty("es.node.mode"))&& Strings.isEmpty(System.getProperty("es.node.local"))) {
return "local"; // default if nothing is specified