diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java index 9be4f8a977a..9add94d60f1 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java @@ -21,6 +21,7 @@ package org.elasticsearch.client.transport; import com.google.common.collect.ImmutableList; import com.google.inject.Inject; +import org.elasticsearch.ElasticSearchException; import org.elasticsearch.action.TransportActions; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; import org.elasticsearch.client.Requests; @@ -31,6 +32,7 @@ import org.elasticsearch.cluster.node.Node; import org.elasticsearch.cluster.node.Nodes; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.BaseTransportResponseHandler; +import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.RemoteTransportException; import org.elasticsearch.transport.TransportService; import org.elasticsearch.util.TimeValue; @@ -118,12 +120,21 @@ public class TransportClientNodesService extends AbstractComponent implements Cl return this; } - public Node randomNode() { + public T execute(NodeCallback callback) throws ElasticSearchException { ImmutableList nodes = this.nodes; if (nodes.isEmpty()) { throw new NoNodeAvailableException(); } - return nodes.get(Math.abs(randomNodeGenerator.incrementAndGet()) % nodes.size()); + int index = randomNodeGenerator.incrementAndGet(); + for (int i = 0; i < nodes.size(); i++) { + Node node = nodes.get((index + i) % nodes.size()); + try { + return callback.doWithNode(node); + } catch (ConnectTransportException e) { + // retry in this case + } + } + throw new NoNodeAvailableException(); } public void close() { @@ -205,4 +216,9 @@ public class TransportClientNodesService extends AbstractComponent implements Cl transportService.nodesRemoved(tempNodes); } } + + public static interface NodeCallback { + + T doWithNode(Node node) throws ElasticSearchException; + } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/support/InternalTransportClient.java b/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/support/InternalTransportClient.java index 4bc07c716c9..2debd7e3cf4 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/support/InternalTransportClient.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/support/InternalTransportClient.java @@ -20,6 +20,7 @@ package org.elasticsearch.client.transport.support; import com.google.inject.Inject; +import org.elasticsearch.ElasticSearchException; import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.count.CountRequest; @@ -45,6 +46,7 @@ import org.elasticsearch.client.transport.action.get.ClientTransportGetAction; import org.elasticsearch.client.transport.action.index.ClientTransportIndexAction; import org.elasticsearch.client.transport.action.search.ClientTransportSearchAction; import org.elasticsearch.client.transport.action.search.ClientTransportSearchScrollAction; +import org.elasticsearch.cluster.node.Node; import org.elasticsearch.util.component.AbstractComponent; import org.elasticsearch.util.settings.Settings; @@ -96,87 +98,178 @@ public class InternalTransportClient extends AbstractComponent implements Client return adminClient; } - @Override public ActionFuture index(IndexRequest request) { - return indexAction.submit(nodesService.randomNode(), request); + @Override public ActionFuture index(final IndexRequest request) { + return nodesService.execute(new TransportClientNodesService.NodeCallback>() { + @Override public ActionFuture doWithNode(Node node) throws ElasticSearchException { + return indexAction.submit(node, request); + } + }); } - @Override public ActionFuture index(IndexRequest request, ActionListener listener) { - return indexAction.submit(nodesService.randomNode(), request, listener); + @Override public ActionFuture index(final IndexRequest request, final ActionListener listener) { + return nodesService.execute(new TransportClientNodesService.NodeCallback>() { + @Override public ActionFuture doWithNode(Node node) throws ElasticSearchException { + return indexAction.submit(node, request, listener); + } + }); } - @Override public void execIndex(IndexRequest request, ActionListener listener) { - indexAction.execute(nodesService.randomNode(), request, listener); + @Override public void execIndex(final IndexRequest request, final ActionListener listener) { + nodesService.execute(new TransportClientNodesService.NodeCallback() { + @Override public Void doWithNode(Node node) throws ElasticSearchException { + indexAction.execute(node, request, listener); + return null; + } + }); } - @Override public ActionFuture delete(DeleteRequest request) { - return deleteAction.submit(nodesService.randomNode(), request); + @Override public ActionFuture delete(final DeleteRequest request) { + return nodesService.execute(new TransportClientNodesService.NodeCallback>() { + @Override public ActionFuture doWithNode(Node node) throws ElasticSearchException { + return deleteAction.submit(node, request); + } + }); } - @Override public ActionFuture delete(DeleteRequest request, ActionListener listener) { - return deleteAction.submit(nodesService.randomNode(), request, listener); + @Override public ActionFuture delete(final DeleteRequest request, final ActionListener listener) { + return nodesService.execute(new TransportClientNodesService.NodeCallback>() { + @Override public ActionFuture doWithNode(Node node) throws ElasticSearchException { + return deleteAction.submit(node, request, listener); + } + }); } - @Override public void execDelete(DeleteRequest request, ActionListener listener) { - deleteAction.execute(nodesService.randomNode(), request, listener); + @Override public void execDelete(final DeleteRequest request, final ActionListener listener) { + nodesService.execute(new TransportClientNodesService.NodeCallback() { + @Override public Void doWithNode(Node node) throws ElasticSearchException { + deleteAction.execute(node, request, listener); + return null; + } + }); } - @Override public ActionFuture deleteByQuery(DeleteByQueryRequest request) { - return deleteByQueryAction.submit(nodesService.randomNode(), request); + @Override public ActionFuture deleteByQuery(final DeleteByQueryRequest request) { + return nodesService.execute(new TransportClientNodesService.NodeCallback>() { + @Override public ActionFuture doWithNode(Node node) throws ElasticSearchException { + return deleteByQueryAction.submit(node, request); + } + }); } - @Override public ActionFuture deleteByQuery(DeleteByQueryRequest request, ActionListener listener) { - return deleteByQueryAction.submit(nodesService.randomNode(), request, listener); + @Override public ActionFuture deleteByQuery(final DeleteByQueryRequest request, final ActionListener listener) { + return nodesService.execute(new TransportClientNodesService.NodeCallback>() { + @Override public ActionFuture doWithNode(Node node) throws ElasticSearchException { + return deleteByQueryAction.submit(node, request, listener); + } + }); } - @Override public void execDeleteByQuery(DeleteByQueryRequest request, ActionListener listener) { - deleteByQueryAction.execute(nodesService.randomNode(), request, listener); + @Override public void execDeleteByQuery(final DeleteByQueryRequest request, final ActionListener listener) { + nodesService.execute(new TransportClientNodesService.NodeCallback() { + @Override public Void doWithNode(Node node) throws ElasticSearchException { + deleteByQueryAction.execute(node, request, listener); + return null; + } + }); } - @Override public ActionFuture get(GetRequest request) { - return getAction.submit(nodesService.randomNode(), request); + @Override public ActionFuture get(final GetRequest request) { + return nodesService.execute(new TransportClientNodesService.NodeCallback>() { + @Override public ActionFuture doWithNode(Node node) throws ElasticSearchException { + return getAction.submit(node, request); + } + }); } - @Override public ActionFuture get(GetRequest request, ActionListener listener) { - return getAction.submit(nodesService.randomNode(), request, listener); + @Override public ActionFuture get(final GetRequest request, final ActionListener listener) { + return nodesService.execute(new TransportClientNodesService.NodeCallback>() { + @Override public ActionFuture doWithNode(Node node) throws ElasticSearchException { + return getAction.submit(node, request, listener); + } + }); } - @Override public void execGet(GetRequest request, ActionListener listener) { - getAction.execute(nodesService.randomNode(), request, listener); + @Override public void execGet(final GetRequest request, final ActionListener listener) { + nodesService.execute(new TransportClientNodesService.NodeCallback() { + @Override public Object doWithNode(Node node) throws ElasticSearchException { + getAction.execute(node, request, listener); + return null; + } + }); } - @Override public ActionFuture count(CountRequest request) { - return countAction.submit(nodesService.randomNode(), request); + @Override public ActionFuture count(final CountRequest request) { + return nodesService.execute(new TransportClientNodesService.NodeCallback>() { + @Override public ActionFuture doWithNode(Node node) throws ElasticSearchException { + return countAction.submit(node, request); + } + }); } - @Override public ActionFuture count(CountRequest request, ActionListener listener) { - return countAction.submit(nodesService.randomNode(), request, listener); + @Override public ActionFuture count(final CountRequest request, final ActionListener listener) { + return nodesService.execute(new TransportClientNodesService.NodeCallback>() { + @Override public ActionFuture doWithNode(Node node) throws ElasticSearchException { + return countAction.submit(node, request, listener); + } + }); } - @Override public void execCount(CountRequest request, ActionListener listener) { - countAction.execute(nodesService.randomNode(), request, listener); + @Override public void execCount(final CountRequest request, final ActionListener listener) { + nodesService.execute(new TransportClientNodesService.NodeCallback() { + @Override public Void doWithNode(Node node) throws ElasticSearchException { + countAction.execute(node, request, listener); + return null; + } + }); } - @Override public ActionFuture search(SearchRequest request) { - return searchAction.submit(nodesService.randomNode(), request); + @Override public ActionFuture search(final SearchRequest request) { + return nodesService.execute(new TransportClientNodesService.NodeCallback>() { + @Override public ActionFuture doWithNode(Node node) throws ElasticSearchException { + return searchAction.submit(node, request); + } + }); } - @Override public ActionFuture search(SearchRequest request, ActionListener listener) { - return searchAction.submit(nodesService.randomNode(), request, listener); + @Override public ActionFuture search(final SearchRequest request, final ActionListener listener) { + return nodesService.execute(new TransportClientNodesService.NodeCallback>() { + @Override public ActionFuture doWithNode(Node node) throws ElasticSearchException { + return searchAction.submit(node, request, listener); + } + }); } - @Override public void execSearch(SearchRequest request, ActionListener listener) { - searchAction.execute(nodesService.randomNode(), request, listener); + @Override public void execSearch(final SearchRequest request, final ActionListener listener) { + nodesService.execute(new TransportClientNodesService.NodeCallback() { + @Override public Object doWithNode(Node node) throws ElasticSearchException { + searchAction.execute(node, request, listener); + return null; + } + }); } - @Override public ActionFuture searchScroll(SearchScrollRequest request) { - return searchScrollAction.submit(nodesService.randomNode(), request); + @Override public ActionFuture searchScroll(final SearchScrollRequest request) { + return nodesService.execute(new TransportClientNodesService.NodeCallback>() { + @Override public ActionFuture doWithNode(Node node) throws ElasticSearchException { + return searchScrollAction.submit(node, request); + } + }); } - @Override public ActionFuture searchScroll(SearchScrollRequest request, ActionListener listener) { - return searchScrollAction.submit(nodesService.randomNode(), request, listener); + @Override public ActionFuture searchScroll(final SearchScrollRequest request, final ActionListener listener) { + return nodesService.execute(new TransportClientNodesService.NodeCallback>() { + @Override public ActionFuture doWithNode(Node node) throws ElasticSearchException { + return searchScrollAction.submit(node, request, listener); + } + }); } - @Override public void execSearchScroll(SearchScrollRequest request, ActionListener listener) { - searchScrollAction.execute(nodesService.randomNode(), request, listener); + @Override public void execSearchScroll(final SearchScrollRequest request, final ActionListener listener) { + nodesService.execute(new TransportClientNodesService.NodeCallback() { + @Override public Object doWithNode(Node node) throws ElasticSearchException { + searchScrollAction.execute(node, request, listener); + return null; + } + }); } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/support/InternalTransportClusterAdminClient.java b/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/support/InternalTransportClusterAdminClient.java index ee105f6d43d..8e19e73acd6 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/support/InternalTransportClusterAdminClient.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/support/InternalTransportClusterAdminClient.java @@ -20,6 +20,7 @@ package org.elasticsearch.client.transport.support; import com.google.inject.Inject; +import org.elasticsearch.ElasticSearchException; import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest; @@ -39,6 +40,7 @@ import org.elasticsearch.client.transport.action.admin.cluster.ping.broadcast.Cl import org.elasticsearch.client.transport.action.admin.cluster.ping.replication.ClientTransportReplicationPingAction; import org.elasticsearch.client.transport.action.admin.cluster.ping.single.ClientTransportSinglePingAction; import org.elasticsearch.client.transport.action.admin.cluster.state.ClientTransportClusterStateAction; +import org.elasticsearch.cluster.node.Node; import org.elasticsearch.util.component.AbstractComponent; import org.elasticsearch.util.settings.Settings; @@ -72,63 +74,128 @@ public class InternalTransportClusterAdminClient extends AbstractComponent imple this.broadcastPingAction = broadcastPingAction; } - @Override public ActionFuture state(ClusterStateRequest request) { - return clusterStateAction.submit(nodesService.randomNode(), request); + @Override public ActionFuture state(final ClusterStateRequest request) { + return nodesService.execute(new TransportClientNodesService.NodeCallback>() { + @Override public ActionFuture doWithNode(Node node) throws ElasticSearchException { + return clusterStateAction.submit(node, request); + } + }); } - @Override public ActionFuture state(ClusterStateRequest request, ActionListener listener) { - return clusterStateAction.submit(nodesService.randomNode(), request, listener); + @Override public ActionFuture state(final ClusterStateRequest request, final ActionListener listener) { + return nodesService.execute(new TransportClientNodesService.NodeCallback>() { + @Override public ActionFuture doWithNode(Node node) throws ElasticSearchException { + return clusterStateAction.submit(node, request, listener); + } + }); } - @Override public void execState(ClusterStateRequest request, ActionListener listener) { - clusterStateAction.execute(nodesService.randomNode(), request, listener); + @Override public void execState(final ClusterStateRequest request, final ActionListener listener) { + nodesService.execute(new TransportClientNodesService.NodeCallback() { + @Override public Void doWithNode(Node node) throws ElasticSearchException { + clusterStateAction.execute(node, request, listener); + return null; + } + }); } - @Override public ActionFuture ping(SinglePingRequest request) { - return singlePingAction.submit(nodesService.randomNode(), request); + @Override public ActionFuture ping(final SinglePingRequest request) { + return nodesService.execute(new TransportClientNodesService.NodeCallback>() { + @Override public ActionFuture doWithNode(Node node) throws ElasticSearchException { + return singlePingAction.submit(node, request); + } + }); } - @Override public ActionFuture ping(SinglePingRequest request, ActionListener listener) { - return singlePingAction.submit(nodesService.randomNode(), request, listener); + @Override public ActionFuture ping(final SinglePingRequest request, final ActionListener listener) { + return nodesService.execute(new TransportClientNodesService.NodeCallback>() { + @Override public ActionFuture doWithNode(Node node) throws ElasticSearchException { + return singlePingAction.submit(node, request, listener); + } + }); } - @Override public void execPing(SinglePingRequest request, ActionListener listener) { - singlePingAction.execute(nodesService.randomNode(), request, listener); + @Override public void execPing(final SinglePingRequest request, final ActionListener listener) { + nodesService.execute(new TransportClientNodesService.NodeCallback() { + @Override public Void doWithNode(Node node) throws ElasticSearchException { + singlePingAction.execute(node, request, listener); + return null; + } + }); } - @Override public ActionFuture ping(BroadcastPingRequest request) { - return broadcastPingAction.submit(nodesService.randomNode(), request); + @Override public ActionFuture ping(final BroadcastPingRequest request) { + return nodesService.execute(new TransportClientNodesService.NodeCallback>() { + @Override public ActionFuture doWithNode(Node node) throws ElasticSearchException { + return broadcastPingAction.submit(node, request); + } + }); } - @Override public ActionFuture ping(BroadcastPingRequest request, ActionListener listener) { - return broadcastPingAction.submit(nodesService.randomNode(), request, listener); + @Override public ActionFuture ping(final BroadcastPingRequest request, final ActionListener listener) { + return nodesService.execute(new TransportClientNodesService.NodeCallback>() { + @Override public ActionFuture doWithNode(Node node) throws ElasticSearchException { + return broadcastPingAction.submit(node, request, listener); + } + }); } - @Override public void execPing(BroadcastPingRequest request, ActionListener listener) { - broadcastPingAction.execute(nodesService.randomNode(), request, listener); + @Override public void execPing(final BroadcastPingRequest request, final ActionListener listener) { + nodesService.execute(new TransportClientNodesService.NodeCallback() { + @Override public Void doWithNode(Node node) throws ElasticSearchException { + broadcastPingAction.execute(node, request, listener); + return null; + } + }); } - @Override public ActionFuture ping(ReplicationPingRequest request) { - return replicationPingAction.submit(nodesService.randomNode(), request); + @Override public ActionFuture ping(final ReplicationPingRequest request) { + return nodesService.execute(new TransportClientNodesService.NodeCallback>() { + @Override public ActionFuture doWithNode(Node node) throws ElasticSearchException { + return replicationPingAction.submit(node, request); + } + }); } - @Override public ActionFuture ping(ReplicationPingRequest request, ActionListener listener) { - return replicationPingAction.submit(nodesService.randomNode(), request, listener); + @Override public ActionFuture ping(final ReplicationPingRequest request, final ActionListener listener) { + return nodesService.execute(new TransportClientNodesService.NodeCallback>() { + @Override public ActionFuture doWithNode(Node node) throws ElasticSearchException { + return replicationPingAction.submit(node, request, listener); + } + }); } - @Override public void execPing(ReplicationPingRequest request, ActionListener listener) { - replicationPingAction.execute(nodesService.randomNode(), request, listener); + @Override public void execPing(final ReplicationPingRequest request, final ActionListener listener) { + nodesService.execute(new TransportClientNodesService.NodeCallback() { + @Override public Void doWithNode(Node node) throws ElasticSearchException { + replicationPingAction.execute(node, request, listener); + return null; + } + }); } - @Override public ActionFuture nodesInfo(NodesInfoRequest request) { - return nodesInfoAction.submit(nodesService.randomNode(), request); + @Override public ActionFuture nodesInfo(final NodesInfoRequest request) { + return nodesService.execute(new TransportClientNodesService.NodeCallback>() { + @Override public ActionFuture doWithNode(Node node) throws ElasticSearchException { + return nodesInfoAction.submit(node, request); + } + }); } - @Override public ActionFuture nodesInfo(NodesInfoRequest request, ActionListener listener) { - return nodesInfoAction.submit(nodesService.randomNode(), request, listener); + @Override public ActionFuture nodesInfo(final NodesInfoRequest request, final ActionListener listener) { + return nodesService.execute(new TransportClientNodesService.NodeCallback>() { + @Override public ActionFuture doWithNode(Node node) throws ElasticSearchException { + return nodesInfoAction.submit(node, request, listener); + } + }); } - @Override public void execNodesInfo(NodesInfoRequest request, ActionListener listener) { - nodesInfoAction.execute(nodesService.randomNode(), request, listener); + @Override public void execNodesInfo(final NodesInfoRequest request, final ActionListener listener) { + nodesService.execute(new TransportClientNodesService.NodeCallback() { + @Override public Void doWithNode(Node node) throws ElasticSearchException { + nodesInfoAction.execute(node, request, listener); + return null; + } + }); } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/support/InternalTransportIndicesAdminClient.java b/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/support/InternalTransportIndicesAdminClient.java index 6de359c5ce4..6958f334887 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/support/InternalTransportIndicesAdminClient.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/support/InternalTransportIndicesAdminClient.java @@ -20,6 +20,7 @@ package org.elasticsearch.client.transport.support; import com.google.inject.Inject; +import org.elasticsearch.ElasticSearchException; import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; @@ -45,6 +46,7 @@ import org.elasticsearch.client.transport.action.admin.indices.gateway.snapshot. import org.elasticsearch.client.transport.action.admin.indices.mapping.create.ClientTransportCreateMappingAction; import org.elasticsearch.client.transport.action.admin.indices.refresh.ClientTransportRefreshAction; import org.elasticsearch.client.transport.action.admin.indices.status.ClientTransportIndicesStatusAction; +import org.elasticsearch.cluster.node.Node; import org.elasticsearch.util.component.AbstractComponent; import org.elasticsearch.util.settings.Settings; @@ -85,87 +87,178 @@ public class InternalTransportIndicesAdminClient extends AbstractComponent imple this.gatewaySnapshotAction = gatewaySnapshotAction; } - @Override public ActionFuture status(IndicesStatusRequest request) { - return indicesStatusAction.submit(nodesService.randomNode(), request); + @Override public ActionFuture status(final IndicesStatusRequest request) { + return nodesService.execute(new TransportClientNodesService.NodeCallback>() { + @Override public ActionFuture doWithNode(Node node) throws ElasticSearchException { + return indicesStatusAction.submit(node, request); + } + }); } - @Override public ActionFuture status(IndicesStatusRequest request, ActionListener listener) { - return indicesStatusAction.submit(nodesService.randomNode(), request, listener); + @Override public ActionFuture status(final IndicesStatusRequest request, final ActionListener listener) { + return nodesService.execute(new TransportClientNodesService.NodeCallback>() { + @Override public ActionFuture doWithNode(Node node) throws ElasticSearchException { + return indicesStatusAction.submit(node, request, listener); + } + }); } - @Override public void execStatus(IndicesStatusRequest request, ActionListener listener) { - indicesStatusAction.execute(nodesService.randomNode(), request, listener); + @Override public void execStatus(final IndicesStatusRequest request, final ActionListener listener) { + nodesService.execute(new TransportClientNodesService.NodeCallback() { + @Override public Void doWithNode(Node node) throws ElasticSearchException { + indicesStatusAction.execute(node, request, listener); + return null; + } + }); } - @Override public ActionFuture create(CreateIndexRequest request) { - return createIndexAction.submit(nodesService.randomNode(), request); + @Override public ActionFuture create(final CreateIndexRequest request) { + return nodesService.execute(new TransportClientNodesService.NodeCallback>() { + @Override public ActionFuture doWithNode(Node node) throws ElasticSearchException { + return createIndexAction.submit(node, request); + } + }); } - @Override public ActionFuture create(CreateIndexRequest request, ActionListener listener) { - return createIndexAction.submit(nodesService.randomNode(), request, listener); + @Override public ActionFuture create(final CreateIndexRequest request, final ActionListener listener) { + return nodesService.execute(new TransportClientNodesService.NodeCallback>() { + @Override public ActionFuture doWithNode(Node node) throws ElasticSearchException { + return createIndexAction.submit(node, request, listener); + } + }); } - @Override public void execCreate(CreateIndexRequest request, ActionListener listener) { - createIndexAction.execute(nodesService.randomNode(), request, listener); + @Override public void execCreate(final CreateIndexRequest request, final ActionListener listener) { + nodesService.execute(new TransportClientNodesService.NodeCallback() { + @Override public Object doWithNode(Node node) throws ElasticSearchException { + createIndexAction.execute(node, request, listener); + return null; + } + }); } - @Override public ActionFuture delete(DeleteIndexRequest request) { - return deleteIndexAction.submit(nodesService.randomNode(), request); + @Override public ActionFuture delete(final DeleteIndexRequest request) { + return nodesService.execute(new TransportClientNodesService.NodeCallback>() { + @Override public ActionFuture doWithNode(Node node) throws ElasticSearchException { + return deleteIndexAction.submit(node, request); + } + }); } - @Override public ActionFuture delete(DeleteIndexRequest request, ActionListener listener) { - return deleteIndexAction.submit(nodesService.randomNode(), request, listener); + @Override public ActionFuture delete(final DeleteIndexRequest request, final ActionListener listener) { + return nodesService.execute(new TransportClientNodesService.NodeCallback>() { + @Override public ActionFuture doWithNode(Node node) throws ElasticSearchException { + return deleteIndexAction.submit(node, request, listener); + } + }); } - @Override public void execDelete(DeleteIndexRequest request, ActionListener listener) { - deleteIndexAction.execute(nodesService.randomNode(), request, listener); + @Override public void execDelete(final DeleteIndexRequest request, final ActionListener listener) { + nodesService.execute(new TransportClientNodesService.NodeCallback() { + @Override public Object doWithNode(Node node) throws ElasticSearchException { + deleteIndexAction.execute(node, request, listener); + return null; + } + }); } - @Override public ActionFuture refresh(RefreshRequest request) { - return refreshAction.submit(nodesService.randomNode(), request); + @Override public ActionFuture refresh(final RefreshRequest request) { + return nodesService.execute(new TransportClientNodesService.NodeCallback>() { + @Override public ActionFuture doWithNode(Node node) throws ElasticSearchException { + return refreshAction.submit(node, request); + } + }); } - @Override public ActionFuture refresh(RefreshRequest request, ActionListener listener) { - return refreshAction.submit(nodesService.randomNode(), request, listener); + @Override public ActionFuture refresh(final RefreshRequest request, final ActionListener listener) { + return nodesService.execute(new TransportClientNodesService.NodeCallback>() { + @Override public ActionFuture doWithNode(Node node) throws ElasticSearchException { + return refreshAction.submit(node, request, listener); + } + }); } - @Override public void execRefresh(RefreshRequest request, ActionListener listener) { - refreshAction.execute(nodesService.randomNode(), request, listener); + @Override public void execRefresh(final RefreshRequest request, final ActionListener listener) { + nodesService.execute(new TransportClientNodesService.NodeCallback() { + @Override public Void doWithNode(Node node) throws ElasticSearchException { + refreshAction.execute(node, request, listener); + return null; + } + }); } - @Override public ActionFuture flush(FlushRequest request) { - return flushAction.submit(nodesService.randomNode(), request); + @Override public ActionFuture flush(final FlushRequest request) { + return nodesService.execute(new TransportClientNodesService.NodeCallback>() { + @Override public ActionFuture doWithNode(Node node) throws ElasticSearchException { + return flushAction.submit(node, request); + } + }); } - @Override public ActionFuture flush(FlushRequest request, ActionListener listener) { - return flushAction.submit(nodesService.randomNode(), request, listener); + @Override public ActionFuture flush(final FlushRequest request, final ActionListener listener) { + return nodesService.execute(new TransportClientNodesService.NodeCallback>() { + @Override public ActionFuture doWithNode(Node node) throws ElasticSearchException { + return flushAction.submit(node, request, listener); + } + }); } - @Override public void execFlush(FlushRequest request, ActionListener listener) { - flushAction.execute(nodesService.randomNode(), request, listener); + @Override public void execFlush(final FlushRequest request, final ActionListener listener) { + nodesService.execute(new TransportClientNodesService.NodeCallback() { + @Override public Object doWithNode(Node node) throws ElasticSearchException { + flushAction.execute(node, request, listener); + return null; + } + }); } - @Override public ActionFuture createMapping(CreateMappingRequest request) { - return createMappingAction.submit(nodesService.randomNode(), request); + @Override public ActionFuture createMapping(final CreateMappingRequest request) { + return nodesService.execute(new TransportClientNodesService.NodeCallback>() { + @Override public ActionFuture doWithNode(Node node) throws ElasticSearchException { + return createMappingAction.submit(node, request); + } + }); } - @Override public ActionFuture createMapping(CreateMappingRequest request, ActionListener listener) { - return createMappingAction.submit(nodesService.randomNode(), request, listener); + @Override public ActionFuture createMapping(final CreateMappingRequest request, final ActionListener listener) { + return nodesService.execute(new TransportClientNodesService.NodeCallback>() { + @Override public ActionFuture doWithNode(Node node) throws ElasticSearchException { + return createMappingAction.submit(node, request, listener); + } + }); } - @Override public void execCreateMapping(CreateMappingRequest request, ActionListener listener) { - createMappingAction.execute(nodesService.randomNode(), request, listener); + @Override public void execCreateMapping(final CreateMappingRequest request, final ActionListener listener) { + nodesService.execute(new TransportClientNodesService.NodeCallback() { + @Override public Void doWithNode(Node node) throws ElasticSearchException { + createMappingAction.execute(node, request, listener); + return null; + } + }); } - @Override public ActionFuture gatewaySnapshot(GatewaySnapshotRequest request) { - return gatewaySnapshotAction.submit(nodesService.randomNode(), request); + @Override public ActionFuture gatewaySnapshot(final GatewaySnapshotRequest request) { + return nodesService.execute(new TransportClientNodesService.NodeCallback>() { + @Override public ActionFuture doWithNode(Node node) throws ElasticSearchException { + return gatewaySnapshotAction.submit(node, request); + } + }); } - @Override public ActionFuture gatewaySnapshot(GatewaySnapshotRequest request, ActionListener listener) { - return gatewaySnapshotAction.submit(nodesService.randomNode(), request, listener); + @Override public ActionFuture gatewaySnapshot(final GatewaySnapshotRequest request, final ActionListener listener) { + return nodesService.execute(new TransportClientNodesService.NodeCallback>() { + @Override public ActionFuture doWithNode(Node node) throws ElasticSearchException { + return gatewaySnapshotAction.submit(node, request, listener); + } + }); } - @Override public void execGatewaySnapshot(GatewaySnapshotRequest request, ActionListener listener) { - gatewaySnapshotAction.execute(nodesService.randomNode(), request, listener); + @Override public void execGatewaySnapshot(final GatewaySnapshotRequest request, final ActionListener listener) { + nodesService.execute(new TransportClientNodesService.NodeCallback() { + @Override public Object doWithNode(Node node) throws ElasticSearchException { + gatewaySnapshotAction.execute(node, request, listener); + return null; + } + }); } } diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/client/transport/SimpleSingleTransportClientTests.java b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/client/transport/SimpleSingleTransportClientTests.java index acfdf5fe156..1ab0a3d3880 100644 --- a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/client/transport/SimpleSingleTransportClientTests.java +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/client/transport/SimpleSingleTransportClientTests.java @@ -29,10 +29,10 @@ import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.client.Client; import org.elasticsearch.client.Requests; +import org.elasticsearch.client.transport.NoNodeAvailableException; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.server.internal.InternalServer; import org.elasticsearch.test.integration.AbstractServersTests; -import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.TransportService; import org.elasticsearch.util.settings.ImmutableSettings; import org.elasticsearch.util.transport.TransportAddress; @@ -163,7 +163,7 @@ public class SimpleSingleTransportClientTests extends AbstractServersTests { try { client.index(Requests.indexRequest("test").type("type1").id("1").source(source("1", "test"))).actionGet(); assert false : "should fail..."; - } catch (ConnectTransportException e) { + } catch (NoNodeAvailableException e) { // all is well } }