Java API TransportClient can fail on remote node shutdown instead of retrying the next connected node under heavy load, closes #1229.
This commit is contained in:
parent
0d755472d3
commit
fc6e0dd037
|
@ -20,6 +20,7 @@
|
|||
package org.elasticsearch.client.transport;
|
||||
|
||||
import org.elasticsearch.ElasticSearchException;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.TransportActions;
|
||||
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
|
||||
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
|
||||
|
@ -143,6 +144,10 @@ public class TransportClientNodesService extends AbstractComponent {
|
|||
throw new NoNodeAvailableException();
|
||||
}
|
||||
int index = randomNodeGenerator.incrementAndGet();
|
||||
if (index < 0) {
|
||||
index = 0;
|
||||
randomNodeGenerator.set(0);
|
||||
}
|
||||
for (int i = 0; i < nodes.size(); i++) {
|
||||
DiscoveryNode node = nodes.get((index + i) % nodes.size());
|
||||
try {
|
||||
|
@ -154,6 +159,62 @@ public class TransportClientNodesService extends AbstractComponent {
|
|||
throw new NoNodeAvailableException();
|
||||
}
|
||||
|
||||
public <Response> void execute(NodeListenerCallback<Response> callback, ActionListener<Response> listener) throws ElasticSearchException {
|
||||
ImmutableList<DiscoveryNode> nodes = this.nodes;
|
||||
if (nodes.isEmpty()) {
|
||||
throw new NoNodeAvailableException();
|
||||
}
|
||||
int index = randomNodeGenerator.incrementAndGet();
|
||||
if (index < 0) {
|
||||
index = 0;
|
||||
randomNodeGenerator.set(0);
|
||||
}
|
||||
RetryListener<Response> retryListener = new RetryListener<Response>(callback, listener, nodes, index);
|
||||
try {
|
||||
callback.doWithNode(nodes.get((index) % nodes.size()), retryListener);
|
||||
} catch (ConnectTransportException e) {
|
||||
retryListener.onFailure(e);
|
||||
}
|
||||
}
|
||||
|
||||
public static class RetryListener<Response> implements ActionListener<Response> {
|
||||
private final NodeListenerCallback<Response> callback;
|
||||
private final ActionListener<Response> listener;
|
||||
private final ImmutableList<DiscoveryNode> nodes;
|
||||
private final int index;
|
||||
|
||||
private volatile int i;
|
||||
|
||||
public RetryListener(NodeListenerCallback<Response> callback, ActionListener<Response> listener, ImmutableList<DiscoveryNode> nodes, int index) {
|
||||
this.callback = callback;
|
||||
this.listener = listener;
|
||||
this.nodes = nodes;
|
||||
this.index = index;
|
||||
}
|
||||
|
||||
@Override public void onResponse(Response response) {
|
||||
listener.onResponse(response);
|
||||
}
|
||||
|
||||
@Override public void onFailure(Throwable e) {
|
||||
if (e instanceof ConnectTransportException) {
|
||||
int i = ++this.i;
|
||||
if (i == nodes.size()) {
|
||||
listener.onFailure(new NoNodeAvailableException());
|
||||
} else {
|
||||
try {
|
||||
callback.doWithNode(nodes.get((index + i) % nodes.size()), this);
|
||||
} catch (Exception e1) {
|
||||
// retry the next one...
|
||||
onFailure(e);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void close() {
|
||||
closed = true;
|
||||
nodesSamplerFuture.cancel(true);
|
||||
|
@ -292,4 +353,9 @@ public class TransportClientNodesService extends AbstractComponent {
|
|||
|
||||
T doWithNode(DiscoveryNode node) throws ElasticSearchException;
|
||||
}
|
||||
|
||||
public static interface NodeListenerCallback<Response> {
|
||||
|
||||
void doWithNode(DiscoveryNode node, ActionListener<Response> listener) throws ElasticSearchException;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -139,12 +139,11 @@ public class InternalTransportClient extends AbstractClient implements InternalC
|
|||
}
|
||||
|
||||
@Override public void index(final IndexRequest request, final ActionListener<IndexResponse> listener) {
|
||||
nodesService.execute(new TransportClientNodesService.NodeCallback<Void>() {
|
||||
@Override public Void doWithNode(DiscoveryNode node) throws ElasticSearchException {
|
||||
nodesService.execute(new TransportClientNodesService.NodeListenerCallback<IndexResponse>() {
|
||||
@Override public void doWithNode(DiscoveryNode node, ActionListener<IndexResponse> listener) throws ElasticSearchException {
|
||||
indexAction.execute(node, request, listener);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}, listener);
|
||||
}
|
||||
|
||||
@Override public ActionFuture<DeleteResponse> delete(final DeleteRequest request) {
|
||||
|
@ -156,12 +155,11 @@ public class InternalTransportClient extends AbstractClient implements InternalC
|
|||
}
|
||||
|
||||
@Override public void delete(final DeleteRequest request, final ActionListener<DeleteResponse> listener) {
|
||||
nodesService.execute(new TransportClientNodesService.NodeCallback<Void>() {
|
||||
@Override public Void doWithNode(DiscoveryNode node) throws ElasticSearchException {
|
||||
nodesService.execute(new TransportClientNodesService.NodeListenerCallback<DeleteResponse>() {
|
||||
@Override public void doWithNode(DiscoveryNode node, ActionListener<DeleteResponse> listener) throws ElasticSearchException {
|
||||
deleteAction.execute(node, request, listener);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}, listener);
|
||||
}
|
||||
|
||||
@Override public ActionFuture<BulkResponse> bulk(final BulkRequest request) {
|
||||
|
@ -173,12 +171,11 @@ public class InternalTransportClient extends AbstractClient implements InternalC
|
|||
}
|
||||
|
||||
@Override public void bulk(final BulkRequest request, final ActionListener<BulkResponse> listener) {
|
||||
nodesService.execute(new TransportClientNodesService.NodeCallback<Void>() {
|
||||
@Override public Void doWithNode(DiscoveryNode node) throws ElasticSearchException {
|
||||
nodesService.execute(new TransportClientNodesService.NodeListenerCallback<BulkResponse>() {
|
||||
@Override public void doWithNode(DiscoveryNode node, ActionListener<BulkResponse> listener) throws ElasticSearchException {
|
||||
bulkAction.execute(node, request, listener);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}, listener);
|
||||
}
|
||||
|
||||
@Override public ActionFuture<DeleteByQueryResponse> deleteByQuery(final DeleteByQueryRequest request) {
|
||||
|
@ -190,12 +187,11 @@ public class InternalTransportClient extends AbstractClient implements InternalC
|
|||
}
|
||||
|
||||
@Override public void deleteByQuery(final DeleteByQueryRequest request, final ActionListener<DeleteByQueryResponse> listener) {
|
||||
nodesService.execute(new TransportClientNodesService.NodeCallback<Void>() {
|
||||
@Override public Void doWithNode(DiscoveryNode node) throws ElasticSearchException {
|
||||
nodesService.execute(new TransportClientNodesService.NodeListenerCallback<DeleteByQueryResponse>() {
|
||||
@Override public void doWithNode(DiscoveryNode node, ActionListener<DeleteByQueryResponse> listener) throws ElasticSearchException {
|
||||
deleteByQueryAction.execute(node, request, listener);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}, listener);
|
||||
}
|
||||
|
||||
@Override public ActionFuture<GetResponse> get(final GetRequest request) {
|
||||
|
@ -207,12 +203,11 @@ public class InternalTransportClient extends AbstractClient implements InternalC
|
|||
}
|
||||
|
||||
@Override public void get(final GetRequest request, final ActionListener<GetResponse> listener) {
|
||||
nodesService.execute(new TransportClientNodesService.NodeCallback<Object>() {
|
||||
@Override public Object doWithNode(DiscoveryNode node) throws ElasticSearchException {
|
||||
nodesService.execute(new TransportClientNodesService.NodeListenerCallback<GetResponse>() {
|
||||
@Override public void doWithNode(DiscoveryNode node, ActionListener<GetResponse> listener) throws ElasticSearchException {
|
||||
getAction.execute(node, request, listener);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}, listener);
|
||||
}
|
||||
|
||||
@Override public ActionFuture<MultiGetResponse> multiGet(final MultiGetRequest request) {
|
||||
|
@ -224,12 +219,11 @@ public class InternalTransportClient extends AbstractClient implements InternalC
|
|||
}
|
||||
|
||||
@Override public void multiGet(final MultiGetRequest request, final ActionListener<MultiGetResponse> listener) {
|
||||
nodesService.execute(new TransportClientNodesService.NodeCallback<Object>() {
|
||||
@Override public Object doWithNode(DiscoveryNode node) throws ElasticSearchException {
|
||||
nodesService.execute(new TransportClientNodesService.NodeListenerCallback<MultiGetResponse>() {
|
||||
@Override public void doWithNode(DiscoveryNode node, ActionListener<MultiGetResponse> listener) throws ElasticSearchException {
|
||||
multiGetAction.execute(node, request, listener);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}, listener);
|
||||
}
|
||||
|
||||
@Override public ActionFuture<CountResponse> count(final CountRequest request) {
|
||||
|
@ -241,12 +235,11 @@ public class InternalTransportClient extends AbstractClient implements InternalC
|
|||
}
|
||||
|
||||
@Override public void count(final CountRequest request, final ActionListener<CountResponse> listener) {
|
||||
nodesService.execute(new TransportClientNodesService.NodeCallback<Void>() {
|
||||
@Override public Void doWithNode(DiscoveryNode node) throws ElasticSearchException {
|
||||
nodesService.execute(new TransportClientNodesService.NodeListenerCallback<CountResponse>() {
|
||||
@Override public void doWithNode(DiscoveryNode node, ActionListener<CountResponse> listener) throws ElasticSearchException {
|
||||
countAction.execute(node, request, listener);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}, listener);
|
||||
}
|
||||
|
||||
@Override public ActionFuture<SearchResponse> search(final SearchRequest request) {
|
||||
|
@ -258,12 +251,11 @@ public class InternalTransportClient extends AbstractClient implements InternalC
|
|||
}
|
||||
|
||||
@Override public void search(final SearchRequest request, final ActionListener<SearchResponse> listener) {
|
||||
nodesService.execute(new TransportClientNodesService.NodeCallback<Object>() {
|
||||
@Override public Object doWithNode(DiscoveryNode node) throws ElasticSearchException {
|
||||
nodesService.execute(new TransportClientNodesService.NodeListenerCallback<SearchResponse>() {
|
||||
@Override public void doWithNode(DiscoveryNode node, ActionListener<SearchResponse> listener) throws ElasticSearchException {
|
||||
searchAction.execute(node, request, listener);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}, listener);
|
||||
}
|
||||
|
||||
@Override public ActionFuture<SearchResponse> searchScroll(final SearchScrollRequest request) {
|
||||
|
@ -275,12 +267,11 @@ public class InternalTransportClient extends AbstractClient implements InternalC
|
|||
}
|
||||
|
||||
@Override public void searchScroll(final SearchScrollRequest request, final ActionListener<SearchResponse> listener) {
|
||||
nodesService.execute(new TransportClientNodesService.NodeCallback<Object>() {
|
||||
@Override public Object doWithNode(DiscoveryNode node) throws ElasticSearchException {
|
||||
nodesService.execute(new TransportClientNodesService.NodeListenerCallback<SearchResponse>() {
|
||||
@Override public void doWithNode(DiscoveryNode node, ActionListener<SearchResponse> listener) throws ElasticSearchException {
|
||||
searchScrollAction.execute(node, request, listener);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}, listener);
|
||||
}
|
||||
|
||||
@Override public ActionFuture<SearchResponse> moreLikeThis(final MoreLikeThisRequest request) {
|
||||
|
@ -292,12 +283,11 @@ public class InternalTransportClient extends AbstractClient implements InternalC
|
|||
}
|
||||
|
||||
@Override public void moreLikeThis(final MoreLikeThisRequest request, final ActionListener<SearchResponse> listener) {
|
||||
nodesService.execute(new TransportClientNodesService.NodeCallback<Void>() {
|
||||
@Override public Void doWithNode(DiscoveryNode node) throws ElasticSearchException {
|
||||
nodesService.execute(new TransportClientNodesService.NodeListenerCallback<SearchResponse>() {
|
||||
@Override public void doWithNode(DiscoveryNode node, ActionListener<SearchResponse> listener) throws ElasticSearchException {
|
||||
moreLikeThisAction.execute(node, request, listener);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}, listener);
|
||||
}
|
||||
|
||||
@Override public ActionFuture<PercolateResponse> percolate(final PercolateRequest request) {
|
||||
|
@ -309,11 +299,10 @@ public class InternalTransportClient extends AbstractClient implements InternalC
|
|||
}
|
||||
|
||||
@Override public void percolate(final PercolateRequest request, final ActionListener<PercolateResponse> listener) {
|
||||
nodesService.execute(new TransportClientNodesService.NodeCallback<Object>() {
|
||||
@Override public Object doWithNode(DiscoveryNode node) throws ElasticSearchException {
|
||||
nodesService.execute(new TransportClientNodesService.NodeListenerCallback<PercolateResponse>() {
|
||||
@Override public void doWithNode(DiscoveryNode node, ActionListener<PercolateResponse> listener) throws ElasticSearchException {
|
||||
percolateAction.execute(node, request, listener);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}, listener);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -114,12 +114,11 @@ public class InternalTransportClusterAdminClient extends AbstractClusterAdminCli
|
|||
}
|
||||
|
||||
@Override public void health(final ClusterHealthRequest request, final ActionListener<ClusterHealthResponse> listener) {
|
||||
nodesService.execute(new TransportClientNodesService.NodeCallback<Void>() {
|
||||
@Override public Void doWithNode(DiscoveryNode node) throws ElasticSearchException {
|
||||
nodesService.execute(new TransportClientNodesService.NodeListenerCallback<ClusterHealthResponse>() {
|
||||
@Override public void doWithNode(DiscoveryNode node, ActionListener<ClusterHealthResponse> listener) throws ElasticSearchException {
|
||||
clusterHealthAction.execute(node, request, listener);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}, listener);
|
||||
}
|
||||
|
||||
@Override public ActionFuture<ClusterStateResponse> state(final ClusterStateRequest request) {
|
||||
|
@ -131,12 +130,11 @@ public class InternalTransportClusterAdminClient extends AbstractClusterAdminCli
|
|||
}
|
||||
|
||||
@Override public void state(final ClusterStateRequest request, final ActionListener<ClusterStateResponse> listener) {
|
||||
nodesService.execute(new TransportClientNodesService.NodeCallback<Void>() {
|
||||
@Override public Void doWithNode(DiscoveryNode node) throws ElasticSearchException {
|
||||
nodesService.execute(new TransportClientNodesService.NodeListenerCallback<ClusterStateResponse>() {
|
||||
@Override public void doWithNode(DiscoveryNode node, ActionListener<ClusterStateResponse> listener) throws ElasticSearchException {
|
||||
clusterStateAction.execute(node, request, listener);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}, listener);
|
||||
}
|
||||
|
||||
@Override public ActionFuture<SinglePingResponse> ping(final SinglePingRequest request) {
|
||||
|
@ -148,12 +146,11 @@ public class InternalTransportClusterAdminClient extends AbstractClusterAdminCli
|
|||
}
|
||||
|
||||
@Override public void ping(final SinglePingRequest request, final ActionListener<SinglePingResponse> listener) {
|
||||
nodesService.execute(new TransportClientNodesService.NodeCallback<Void>() {
|
||||
@Override public Void doWithNode(DiscoveryNode node) throws ElasticSearchException {
|
||||
nodesService.execute(new TransportClientNodesService.NodeListenerCallback<SinglePingResponse>() {
|
||||
@Override public void doWithNode(DiscoveryNode node, ActionListener<SinglePingResponse> listener) throws ElasticSearchException {
|
||||
singlePingAction.execute(node, request, listener);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}, listener);
|
||||
}
|
||||
|
||||
@Override public ActionFuture<BroadcastPingResponse> ping(final BroadcastPingRequest request) {
|
||||
|
@ -165,12 +162,11 @@ public class InternalTransportClusterAdminClient extends AbstractClusterAdminCli
|
|||
}
|
||||
|
||||
@Override public void ping(final BroadcastPingRequest request, final ActionListener<BroadcastPingResponse> listener) {
|
||||
nodesService.execute(new TransportClientNodesService.NodeCallback<Void>() {
|
||||
@Override public Void doWithNode(DiscoveryNode node) throws ElasticSearchException {
|
||||
nodesService.execute(new TransportClientNodesService.NodeListenerCallback<BroadcastPingResponse>() {
|
||||
@Override public void doWithNode(DiscoveryNode node, ActionListener<BroadcastPingResponse> listener) throws ElasticSearchException {
|
||||
broadcastPingAction.execute(node, request, listener);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}, listener);
|
||||
}
|
||||
|
||||
@Override public ActionFuture<ReplicationPingResponse> ping(final ReplicationPingRequest request) {
|
||||
|
@ -182,12 +178,11 @@ public class InternalTransportClusterAdminClient extends AbstractClusterAdminCli
|
|||
}
|
||||
|
||||
@Override public void ping(final ReplicationPingRequest request, final ActionListener<ReplicationPingResponse> listener) {
|
||||
nodesService.execute(new TransportClientNodesService.NodeCallback<Void>() {
|
||||
@Override public Void doWithNode(DiscoveryNode node) throws ElasticSearchException {
|
||||
nodesService.execute(new TransportClientNodesService.NodeListenerCallback<ReplicationPingResponse>() {
|
||||
@Override public void doWithNode(DiscoveryNode node, ActionListener<ReplicationPingResponse> listener) throws ElasticSearchException {
|
||||
replicationPingAction.execute(node, request, listener);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}, listener);
|
||||
}
|
||||
|
||||
@Override public ActionFuture<NodesInfoResponse> nodesInfo(final NodesInfoRequest request) {
|
||||
|
@ -199,12 +194,11 @@ public class InternalTransportClusterAdminClient extends AbstractClusterAdminCli
|
|||
}
|
||||
|
||||
@Override public void nodesInfo(final NodesInfoRequest request, final ActionListener<NodesInfoResponse> listener) {
|
||||
nodesService.execute(new TransportClientNodesService.NodeCallback<Void>() {
|
||||
@Override public Void doWithNode(DiscoveryNode node) throws ElasticSearchException {
|
||||
nodesService.execute(new TransportClientNodesService.NodeListenerCallback<NodesInfoResponse>() {
|
||||
@Override public void doWithNode(DiscoveryNode node, ActionListener<NodesInfoResponse> listener) throws ElasticSearchException {
|
||||
nodesInfoAction.execute(node, request, listener);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}, listener);
|
||||
}
|
||||
|
||||
@Override public ActionFuture<NodesStatsResponse> nodesStats(final NodesStatsRequest request) {
|
||||
|
@ -216,12 +210,11 @@ public class InternalTransportClusterAdminClient extends AbstractClusterAdminCli
|
|||
}
|
||||
|
||||
@Override public void nodesStats(final NodesStatsRequest request, final ActionListener<NodesStatsResponse> listener) {
|
||||
nodesService.execute(new TransportClientNodesService.NodeCallback<Void>() {
|
||||
@Override public Void doWithNode(DiscoveryNode node) throws ElasticSearchException {
|
||||
nodesService.execute(new TransportClientNodesService.NodeListenerCallback<NodesStatsResponse>() {
|
||||
@Override public void doWithNode(DiscoveryNode node, ActionListener<NodesStatsResponse> listener) throws ElasticSearchException {
|
||||
nodesStatsAction.execute(node, request, listener);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}, listener);
|
||||
}
|
||||
|
||||
@Override public ActionFuture<NodesShutdownResponse> nodesShutdown(final NodesShutdownRequest request) {
|
||||
|
@ -233,12 +226,11 @@ public class InternalTransportClusterAdminClient extends AbstractClusterAdminCli
|
|||
}
|
||||
|
||||
@Override public void nodesShutdown(final NodesShutdownRequest request, final ActionListener<NodesShutdownResponse> listener) {
|
||||
nodesService.execute(new TransportClientNodesService.NodeCallback<ActionFuture<Void>>() {
|
||||
@Override public ActionFuture<Void> doWithNode(DiscoveryNode node) throws ElasticSearchException {
|
||||
nodesService.execute(new TransportClientNodesService.NodeListenerCallback<NodesShutdownResponse>() {
|
||||
@Override public void doWithNode(DiscoveryNode node, ActionListener<NodesShutdownResponse> listener) throws ElasticSearchException {
|
||||
nodesShutdownAction.execute(node, request, listener);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}, listener);
|
||||
}
|
||||
|
||||
@Override public ActionFuture<NodesRestartResponse> nodesRestart(final NodesRestartRequest request) {
|
||||
|
@ -250,11 +242,10 @@ public class InternalTransportClusterAdminClient extends AbstractClusterAdminCli
|
|||
}
|
||||
|
||||
@Override public void nodesRestart(final NodesRestartRequest request, final ActionListener<NodesRestartResponse> listener) {
|
||||
nodesService.execute(new TransportClientNodesService.NodeCallback<ActionFuture<Void>>() {
|
||||
@Override public ActionFuture<Void> doWithNode(DiscoveryNode node) throws ElasticSearchException {
|
||||
nodesService.execute(new TransportClientNodesService.NodeListenerCallback<NodesRestartResponse>() {
|
||||
@Override public void doWithNode(DiscoveryNode node, ActionListener<NodesRestartResponse> listener) throws ElasticSearchException {
|
||||
nodesRestartAction.execute(node, request, listener);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}, listener);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -179,12 +179,11 @@ public class InternalTransportIndicesAdminClient extends AbstractIndicesAdminCli
|
|||
}
|
||||
|
||||
@Override public void exists(final IndicesExistsRequest request, final ActionListener<IndicesExistsResponse> listener) {
|
||||
nodesService.execute(new TransportClientNodesService.NodeCallback<Void>() {
|
||||
@Override public Void doWithNode(DiscoveryNode node) throws ElasticSearchException {
|
||||
nodesService.execute(new TransportClientNodesService.NodeListenerCallback<IndicesExistsResponse>() {
|
||||
@Override public void doWithNode(DiscoveryNode node, ActionListener<IndicesExistsResponse> listener) throws ElasticSearchException {
|
||||
indicesExistsAction.execute(node, request, listener);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}, listener);
|
||||
}
|
||||
|
||||
@Override public ActionFuture<IndicesStatusResponse> status(final IndicesStatusRequest request) {
|
||||
|
@ -196,12 +195,11 @@ public class InternalTransportIndicesAdminClient extends AbstractIndicesAdminCli
|
|||
}
|
||||
|
||||
@Override public void status(final IndicesStatusRequest request, final ActionListener<IndicesStatusResponse> listener) {
|
||||
nodesService.execute(new TransportClientNodesService.NodeCallback<Void>() {
|
||||
@Override public Void doWithNode(DiscoveryNode node) throws ElasticSearchException {
|
||||
nodesService.execute(new TransportClientNodesService.NodeListenerCallback<IndicesStatusResponse>() {
|
||||
@Override public void doWithNode(DiscoveryNode node, ActionListener<IndicesStatusResponse> listener) throws ElasticSearchException {
|
||||
indicesStatusAction.execute(node, request, listener);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}, listener);
|
||||
}
|
||||
|
||||
@Override public ActionFuture<IndicesSegmentResponse> segments(final IndicesSegmentsRequest request) {
|
||||
|
@ -213,12 +211,11 @@ public class InternalTransportIndicesAdminClient extends AbstractIndicesAdminCli
|
|||
}
|
||||
|
||||
@Override public void segments(final IndicesSegmentsRequest request, final ActionListener<IndicesSegmentResponse> listener) {
|
||||
nodesService.execute(new TransportClientNodesService.NodeCallback<Void>() {
|
||||
@Override public Void doWithNode(DiscoveryNode node) throws ElasticSearchException {
|
||||
nodesService.execute(new TransportClientNodesService.NodeListenerCallback<IndicesSegmentResponse>() {
|
||||
@Override public void doWithNode(DiscoveryNode node, ActionListener<IndicesSegmentResponse> listener) throws ElasticSearchException {
|
||||
indicesSegmentsAction.execute(node, request, listener);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}, listener);
|
||||
}
|
||||
|
||||
@Override public ActionFuture<CreateIndexResponse> create(final CreateIndexRequest request) {
|
||||
|
@ -230,12 +227,11 @@ public class InternalTransportIndicesAdminClient extends AbstractIndicesAdminCli
|
|||
}
|
||||
|
||||
@Override public void create(final CreateIndexRequest request, final ActionListener<CreateIndexResponse> listener) {
|
||||
nodesService.execute(new TransportClientNodesService.NodeCallback<Object>() {
|
||||
@Override public Object doWithNode(DiscoveryNode node) throws ElasticSearchException {
|
||||
nodesService.execute(new TransportClientNodesService.NodeListenerCallback<CreateIndexResponse>() {
|
||||
@Override public void doWithNode(DiscoveryNode node, ActionListener<CreateIndexResponse> listener) throws ElasticSearchException {
|
||||
createIndexAction.execute(node, request, listener);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}, listener);
|
||||
}
|
||||
|
||||
@Override public ActionFuture<DeleteIndexResponse> delete(final DeleteIndexRequest request) {
|
||||
|
@ -247,12 +243,11 @@ public class InternalTransportIndicesAdminClient extends AbstractIndicesAdminCli
|
|||
}
|
||||
|
||||
@Override public void delete(final DeleteIndexRequest request, final ActionListener<DeleteIndexResponse> listener) {
|
||||
nodesService.execute(new TransportClientNodesService.NodeCallback<Object>() {
|
||||
@Override public Object doWithNode(DiscoveryNode node) throws ElasticSearchException {
|
||||
nodesService.execute(new TransportClientNodesService.NodeListenerCallback<DeleteIndexResponse>() {
|
||||
@Override public void doWithNode(DiscoveryNode node, ActionListener<DeleteIndexResponse> listener) throws ElasticSearchException {
|
||||
deleteIndexAction.execute(node, request, listener);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}, listener);
|
||||
}
|
||||
|
||||
@Override public ActionFuture<CloseIndexResponse> close(final CloseIndexRequest request) {
|
||||
|
@ -264,12 +259,11 @@ public class InternalTransportIndicesAdminClient extends AbstractIndicesAdminCli
|
|||
}
|
||||
|
||||
@Override public void close(final CloseIndexRequest request, final ActionListener<CloseIndexResponse> listener) {
|
||||
nodesService.execute(new TransportClientNodesService.NodeCallback<Object>() {
|
||||
@Override public Object doWithNode(DiscoveryNode node) throws ElasticSearchException {
|
||||
nodesService.execute(new TransportClientNodesService.NodeListenerCallback<CloseIndexResponse>() {
|
||||
@Override public void doWithNode(DiscoveryNode node, ActionListener<CloseIndexResponse> listener) throws ElasticSearchException {
|
||||
closeIndexAction.execute(node, request, listener);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}, listener);
|
||||
}
|
||||
|
||||
@Override public ActionFuture<OpenIndexResponse> open(final OpenIndexRequest request) {
|
||||
|
@ -281,12 +275,11 @@ public class InternalTransportIndicesAdminClient extends AbstractIndicesAdminCli
|
|||
}
|
||||
|
||||
@Override public void open(final OpenIndexRequest request, final ActionListener<OpenIndexResponse> listener) {
|
||||
nodesService.execute(new TransportClientNodesService.NodeCallback<Object>() {
|
||||
@Override public Object doWithNode(DiscoveryNode node) throws ElasticSearchException {
|
||||
nodesService.execute(new TransportClientNodesService.NodeListenerCallback<OpenIndexResponse>() {
|
||||
@Override public void doWithNode(DiscoveryNode node, ActionListener<OpenIndexResponse> listener) throws ElasticSearchException {
|
||||
openIndexAction.execute(node, request, listener);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}, listener);
|
||||
}
|
||||
|
||||
@Override public ActionFuture<RefreshResponse> refresh(final RefreshRequest request) {
|
||||
|
@ -298,12 +291,11 @@ public class InternalTransportIndicesAdminClient extends AbstractIndicesAdminCli
|
|||
}
|
||||
|
||||
@Override public void refresh(final RefreshRequest request, final ActionListener<RefreshResponse> listener) {
|
||||
nodesService.execute(new TransportClientNodesService.NodeCallback<Void>() {
|
||||
@Override public Void doWithNode(DiscoveryNode node) throws ElasticSearchException {
|
||||
nodesService.execute(new TransportClientNodesService.NodeListenerCallback<RefreshResponse>() {
|
||||
@Override public void doWithNode(DiscoveryNode node, ActionListener<RefreshResponse> listener) throws ElasticSearchException {
|
||||
refreshAction.execute(node, request, listener);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}, listener);
|
||||
}
|
||||
|
||||
@Override public ActionFuture<FlushResponse> flush(final FlushRequest request) {
|
||||
|
@ -315,12 +307,11 @@ public class InternalTransportIndicesAdminClient extends AbstractIndicesAdminCli
|
|||
}
|
||||
|
||||
@Override public void flush(final FlushRequest request, final ActionListener<FlushResponse> listener) {
|
||||
nodesService.execute(new TransportClientNodesService.NodeCallback<Object>() {
|
||||
@Override public Object doWithNode(DiscoveryNode node) throws ElasticSearchException {
|
||||
nodesService.execute(new TransportClientNodesService.NodeListenerCallback<FlushResponse>() {
|
||||
@Override public void doWithNode(DiscoveryNode node, ActionListener<FlushResponse> listener) throws ElasticSearchException {
|
||||
flushAction.execute(node, request, listener);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}, listener);
|
||||
}
|
||||
|
||||
@Override public ActionFuture<OptimizeResponse> optimize(final OptimizeRequest request) {
|
||||
|
@ -332,12 +323,11 @@ public class InternalTransportIndicesAdminClient extends AbstractIndicesAdminCli
|
|||
}
|
||||
|
||||
@Override public void optimize(final OptimizeRequest request, final ActionListener<OptimizeResponse> listener) {
|
||||
nodesService.execute(new TransportClientNodesService.NodeCallback<ActionFuture<Void>>() {
|
||||
@Override public ActionFuture<Void> doWithNode(DiscoveryNode node) throws ElasticSearchException {
|
||||
nodesService.execute(new TransportClientNodesService.NodeListenerCallback<OptimizeResponse>() {
|
||||
@Override public void doWithNode(DiscoveryNode node, ActionListener<OptimizeResponse> listener) throws ElasticSearchException {
|
||||
optimizeAction.execute(node, request, listener);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}, listener);
|
||||
}
|
||||
|
||||
@Override public ActionFuture<PutMappingResponse> putMapping(final PutMappingRequest request) {
|
||||
|
@ -349,12 +339,11 @@ public class InternalTransportIndicesAdminClient extends AbstractIndicesAdminCli
|
|||
}
|
||||
|
||||
@Override public void putMapping(final PutMappingRequest request, final ActionListener<PutMappingResponse> listener) {
|
||||
nodesService.execute(new TransportClientNodesService.NodeCallback<Void>() {
|
||||
@Override public Void doWithNode(DiscoveryNode node) throws ElasticSearchException {
|
||||
nodesService.execute(new TransportClientNodesService.NodeListenerCallback<PutMappingResponse>() {
|
||||
@Override public void doWithNode(DiscoveryNode node, ActionListener<PutMappingResponse> listener) throws ElasticSearchException {
|
||||
putMappingAction.execute(node, request, listener);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}, listener);
|
||||
}
|
||||
|
||||
@Override public ActionFuture<DeleteMappingResponse> deleteMapping(final DeleteMappingRequest request) {
|
||||
|
@ -366,12 +355,11 @@ public class InternalTransportIndicesAdminClient extends AbstractIndicesAdminCli
|
|||
}
|
||||
|
||||
@Override public void deleteMapping(final DeleteMappingRequest request, final ActionListener<DeleteMappingResponse> listener) {
|
||||
nodesService.execute(new TransportClientNodesService.NodeCallback<Void>() {
|
||||
@Override public Void doWithNode(DiscoveryNode node) throws ElasticSearchException {
|
||||
nodesService.execute(new TransportClientNodesService.NodeListenerCallback<DeleteMappingResponse>() {
|
||||
@Override public void doWithNode(DiscoveryNode node, ActionListener<DeleteMappingResponse> listener) throws ElasticSearchException {
|
||||
deleteMappingAction.execute(node, request, listener);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}, listener);
|
||||
}
|
||||
|
||||
@Override public ActionFuture<GatewaySnapshotResponse> gatewaySnapshot(final GatewaySnapshotRequest request) {
|
||||
|
@ -383,12 +371,11 @@ public class InternalTransportIndicesAdminClient extends AbstractIndicesAdminCli
|
|||
}
|
||||
|
||||
@Override public void gatewaySnapshot(final GatewaySnapshotRequest request, final ActionListener<GatewaySnapshotResponse> listener) {
|
||||
nodesService.execute(new TransportClientNodesService.NodeCallback<Object>() {
|
||||
@Override public Object doWithNode(DiscoveryNode node) throws ElasticSearchException {
|
||||
nodesService.execute(new TransportClientNodesService.NodeListenerCallback<GatewaySnapshotResponse>() {
|
||||
@Override public void doWithNode(DiscoveryNode node, ActionListener<GatewaySnapshotResponse> listener) throws ElasticSearchException {
|
||||
gatewaySnapshotAction.execute(node, request, listener);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}, listener);
|
||||
}
|
||||
|
||||
@Override public ActionFuture<IndicesAliasesResponse> aliases(final IndicesAliasesRequest request) {
|
||||
|
@ -400,12 +387,11 @@ public class InternalTransportIndicesAdminClient extends AbstractIndicesAdminCli
|
|||
}
|
||||
|
||||
@Override public void aliases(final IndicesAliasesRequest request, final ActionListener<IndicesAliasesResponse> listener) {
|
||||
nodesService.execute(new TransportClientNodesService.NodeCallback<Void>() {
|
||||
@Override public Void doWithNode(DiscoveryNode node) throws ElasticSearchException {
|
||||
nodesService.execute(new TransportClientNodesService.NodeListenerCallback<IndicesAliasesResponse>() {
|
||||
@Override public void doWithNode(DiscoveryNode node, ActionListener<IndicesAliasesResponse> listener) throws ElasticSearchException {
|
||||
indicesAliasesAction.execute(node, request, listener);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}, listener);
|
||||
}
|
||||
|
||||
@Override public ActionFuture<ClearIndicesCacheResponse> clearCache(final ClearIndicesCacheRequest request) {
|
||||
|
@ -417,12 +403,11 @@ public class InternalTransportIndicesAdminClient extends AbstractIndicesAdminCli
|
|||
}
|
||||
|
||||
@Override public void clearCache(final ClearIndicesCacheRequest request, final ActionListener<ClearIndicesCacheResponse> listener) {
|
||||
nodesService.execute(new TransportClientNodesService.NodeCallback<Void>() {
|
||||
@Override public Void doWithNode(DiscoveryNode node) throws ElasticSearchException {
|
||||
nodesService.execute(new TransportClientNodesService.NodeListenerCallback<ClearIndicesCacheResponse>() {
|
||||
@Override public void doWithNode(DiscoveryNode node, ActionListener<ClearIndicesCacheResponse> listener) throws ElasticSearchException {
|
||||
clearIndicesCacheAction.execute(node, request, listener);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}, listener);
|
||||
}
|
||||
|
||||
@Override public ActionFuture<UpdateSettingsResponse> updateSettings(final UpdateSettingsRequest request) {
|
||||
|
@ -434,12 +419,11 @@ public class InternalTransportIndicesAdminClient extends AbstractIndicesAdminCli
|
|||
}
|
||||
|
||||
@Override public void updateSettings(final UpdateSettingsRequest request, final ActionListener<UpdateSettingsResponse> listener) {
|
||||
nodesService.execute(new TransportClientNodesService.NodeCallback<Void>() {
|
||||
@Override public Void doWithNode(DiscoveryNode node) throws ElasticSearchException {
|
||||
nodesService.execute(new TransportClientNodesService.NodeListenerCallback<UpdateSettingsResponse>() {
|
||||
@Override public void doWithNode(DiscoveryNode node, ActionListener<UpdateSettingsResponse> listener) throws ElasticSearchException {
|
||||
updateSettingsAction.execute(node, request, listener);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}, listener);
|
||||
}
|
||||
|
||||
@Override public ActionFuture<AnalyzeResponse> analyze(final AnalyzeRequest request) {
|
||||
|
@ -451,12 +435,11 @@ public class InternalTransportIndicesAdminClient extends AbstractIndicesAdminCli
|
|||
}
|
||||
|
||||
@Override public void analyze(final AnalyzeRequest request, final ActionListener<AnalyzeResponse> listener) {
|
||||
nodesService.execute(new TransportClientNodesService.NodeCallback<Object>() {
|
||||
@Override public Object doWithNode(DiscoveryNode node) throws ElasticSearchException {
|
||||
nodesService.execute(new TransportClientNodesService.NodeListenerCallback<AnalyzeResponse>() {
|
||||
@Override public void doWithNode(DiscoveryNode node, ActionListener<AnalyzeResponse> listener) throws ElasticSearchException {
|
||||
analyzeAction.execute(node, request, listener);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}, listener);
|
||||
}
|
||||
|
||||
@Override public ActionFuture<PutIndexTemplateResponse> putTemplate(final PutIndexTemplateRequest request) {
|
||||
|
@ -468,12 +451,11 @@ public class InternalTransportIndicesAdminClient extends AbstractIndicesAdminCli
|
|||
}
|
||||
|
||||
@Override public void putTemplate(final PutIndexTemplateRequest request, final ActionListener<PutIndexTemplateResponse> listener) {
|
||||
nodesService.execute(new TransportClientNodesService.NodeCallback<Object>() {
|
||||
@Override public Object doWithNode(DiscoveryNode node) throws ElasticSearchException {
|
||||
nodesService.execute(new TransportClientNodesService.NodeListenerCallback<PutIndexTemplateResponse>() {
|
||||
@Override public void doWithNode(DiscoveryNode node, ActionListener<PutIndexTemplateResponse> listener) throws ElasticSearchException {
|
||||
putIndexTemplateAction.execute(node, request, listener);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}, listener);
|
||||
}
|
||||
|
||||
@Override public ActionFuture<DeleteIndexTemplateResponse> deleteTemplate(final DeleteIndexTemplateRequest request) {
|
||||
|
@ -485,11 +467,10 @@ public class InternalTransportIndicesAdminClient extends AbstractIndicesAdminCli
|
|||
}
|
||||
|
||||
@Override public void deleteTemplate(final DeleteIndexTemplateRequest request, final ActionListener<DeleteIndexTemplateResponse> listener) {
|
||||
nodesService.execute(new TransportClientNodesService.NodeCallback<Object>() {
|
||||
@Override public Object doWithNode(DiscoveryNode node) throws ElasticSearchException {
|
||||
nodesService.execute(new TransportClientNodesService.NodeListenerCallback<DeleteIndexTemplateResponse>() {
|
||||
@Override public void doWithNode(DiscoveryNode node, ActionListener<DeleteIndexTemplateResponse> listener) throws ElasticSearchException {
|
||||
deleteIndexTemplateAction.execute(node, request, listener);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}, listener);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,86 @@
|
|||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.test.stress.client;
|
||||
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||
import org.elasticsearch.client.transport.TransportClient;
|
||||
import org.elasticsearch.common.transport.InetSocketTransportAddress;
|
||||
import org.elasticsearch.node.Node;
|
||||
import org.elasticsearch.node.NodeBuilder;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class ClientFailover {
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
Node[] nodes = new Node[3];
|
||||
for (int i = 0; i < nodes.length; i++) {
|
||||
nodes[i] = NodeBuilder.nodeBuilder().node();
|
||||
}
|
||||
|
||||
final TransportClient client = new TransportClient()
|
||||
.addTransportAddress(new InetSocketTransportAddress("localhost", 9300))
|
||||
.addTransportAddress(new InetSocketTransportAddress("localhost", 9301))
|
||||
.addTransportAddress(new InetSocketTransportAddress("localhost", 9302));
|
||||
|
||||
final AtomicBoolean done = new AtomicBoolean();
|
||||
final AtomicLong indexed = new AtomicLong();
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
Thread indexer = new Thread(new Runnable() {
|
||||
@Override public void run() {
|
||||
while (!done.get()) {
|
||||
try {
|
||||
client.prepareIndex("test", "type").setSource("field", "value").execute().actionGet();
|
||||
indexed.incrementAndGet();
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
indexer.start();
|
||||
|
||||
for (int i = 0; i < 100; i++) {
|
||||
int index = i % nodes.length;
|
||||
nodes[index].close();
|
||||
|
||||
ClusterHealthResponse health = client.admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet();
|
||||
if (health.timedOut()) {
|
||||
System.err.println("timed out on health");
|
||||
}
|
||||
|
||||
nodes[index] = NodeBuilder.nodeBuilder().node();
|
||||
|
||||
health = client.admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet();
|
||||
if (health.timedOut()) {
|
||||
System.err.println("timed out on health");
|
||||
}
|
||||
}
|
||||
|
||||
latch.await();
|
||||
|
||||
// TODO add verification to the number of indexed docs
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue