TransportClient to automatically retry another node if there is a connection problem

This commit is contained in:
kimchy 2010-02-11 22:34:11 +02:00
parent 847db717c6
commit ade36f026b
5 changed files with 387 additions and 118 deletions

View File

@ -21,6 +21,7 @@ package org.elasticsearch.client.transport;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.inject.Inject; import com.google.inject.Inject;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.TransportActions; import org.elasticsearch.action.TransportActions;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.client.Requests; import org.elasticsearch.client.Requests;
@ -31,6 +32,7 @@ import org.elasticsearch.cluster.node.Node;
import org.elasticsearch.cluster.node.Nodes; import org.elasticsearch.cluster.node.Nodes;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BaseTransportResponseHandler; import org.elasticsearch.transport.BaseTransportResponseHandler;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.RemoteTransportException; import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import org.elasticsearch.util.TimeValue; import org.elasticsearch.util.TimeValue;
@ -118,12 +120,21 @@ public class TransportClientNodesService extends AbstractComponent implements Cl
return this; return this;
} }
public Node randomNode() { public <T> T execute(NodeCallback<T> callback) throws ElasticSearchException {
ImmutableList<Node> nodes = this.nodes; ImmutableList<Node> nodes = this.nodes;
if (nodes.isEmpty()) { if (nodes.isEmpty()) {
throw new NoNodeAvailableException(); throw new NoNodeAvailableException();
} }
return nodes.get(Math.abs(randomNodeGenerator.incrementAndGet()) % nodes.size()); int index = randomNodeGenerator.incrementAndGet();
for (int i = 0; i < nodes.size(); i++) {
Node node = nodes.get((index + i) % nodes.size());
try {
return callback.doWithNode(node);
} catch (ConnectTransportException e) {
// retry in this case
}
}
throw new NoNodeAvailableException();
} }
public void close() { public void close() {
@ -205,4 +216,9 @@ public class TransportClientNodesService extends AbstractComponent implements Cl
transportService.nodesRemoved(tempNodes); transportService.nodesRemoved(tempNodes);
} }
} }
public static interface NodeCallback<T> {
T doWithNode(Node node) throws ElasticSearchException;
}
} }

View File

@ -20,6 +20,7 @@
package org.elasticsearch.client.transport.support; package org.elasticsearch.client.transport.support;
import com.google.inject.Inject; import com.google.inject.Inject;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.count.CountRequest; import org.elasticsearch.action.count.CountRequest;
@ -45,6 +46,7 @@ import org.elasticsearch.client.transport.action.get.ClientTransportGetAction;
import org.elasticsearch.client.transport.action.index.ClientTransportIndexAction; import org.elasticsearch.client.transport.action.index.ClientTransportIndexAction;
import org.elasticsearch.client.transport.action.search.ClientTransportSearchAction; import org.elasticsearch.client.transport.action.search.ClientTransportSearchAction;
import org.elasticsearch.client.transport.action.search.ClientTransportSearchScrollAction; import org.elasticsearch.client.transport.action.search.ClientTransportSearchScrollAction;
import org.elasticsearch.cluster.node.Node;
import org.elasticsearch.util.component.AbstractComponent; import org.elasticsearch.util.component.AbstractComponent;
import org.elasticsearch.util.settings.Settings; import org.elasticsearch.util.settings.Settings;
@ -96,87 +98,178 @@ public class InternalTransportClient extends AbstractComponent implements Client
return adminClient; return adminClient;
} }
@Override public ActionFuture<IndexResponse> index(IndexRequest request) { @Override public ActionFuture<IndexResponse> index(final IndexRequest request) {
return indexAction.submit(nodesService.randomNode(), request); return nodesService.execute(new TransportClientNodesService.NodeCallback<ActionFuture<IndexResponse>>() {
@Override public ActionFuture<IndexResponse> doWithNode(Node node) throws ElasticSearchException {
return indexAction.submit(node, request);
}
});
} }
@Override public ActionFuture<IndexResponse> index(IndexRequest request, ActionListener<IndexResponse> listener) { @Override public ActionFuture<IndexResponse> index(final IndexRequest request, final ActionListener<IndexResponse> listener) {
return indexAction.submit(nodesService.randomNode(), request, listener); return nodesService.execute(new TransportClientNodesService.NodeCallback<ActionFuture<IndexResponse>>() {
@Override public ActionFuture<IndexResponse> doWithNode(Node node) throws ElasticSearchException {
return indexAction.submit(node, request, listener);
}
});
} }
@Override public void execIndex(IndexRequest request, ActionListener<IndexResponse> listener) { @Override public void execIndex(final IndexRequest request, final ActionListener<IndexResponse> listener) {
indexAction.execute(nodesService.randomNode(), request, listener); nodesService.execute(new TransportClientNodesService.NodeCallback<Void>() {
@Override public Void doWithNode(Node node) throws ElasticSearchException {
indexAction.execute(node, request, listener);
return null;
}
});
} }
@Override public ActionFuture<DeleteResponse> delete(DeleteRequest request) { @Override public ActionFuture<DeleteResponse> delete(final DeleteRequest request) {
return deleteAction.submit(nodesService.randomNode(), request); return nodesService.execute(new TransportClientNodesService.NodeCallback<ActionFuture<DeleteResponse>>() {
@Override public ActionFuture<DeleteResponse> doWithNode(Node node) throws ElasticSearchException {
return deleteAction.submit(node, request);
}
});
} }
@Override public ActionFuture<DeleteResponse> delete(DeleteRequest request, ActionListener<DeleteResponse> listener) { @Override public ActionFuture<DeleteResponse> delete(final DeleteRequest request, final ActionListener<DeleteResponse> listener) {
return deleteAction.submit(nodesService.randomNode(), request, listener); return nodesService.execute(new TransportClientNodesService.NodeCallback<ActionFuture<DeleteResponse>>() {
@Override public ActionFuture<DeleteResponse> doWithNode(Node node) throws ElasticSearchException {
return deleteAction.submit(node, request, listener);
}
});
} }
@Override public void execDelete(DeleteRequest request, ActionListener<DeleteResponse> listener) { @Override public void execDelete(final DeleteRequest request, final ActionListener<DeleteResponse> listener) {
deleteAction.execute(nodesService.randomNode(), request, listener); nodesService.execute(new TransportClientNodesService.NodeCallback<Void>() {
@Override public Void doWithNode(Node node) throws ElasticSearchException {
deleteAction.execute(node, request, listener);
return null;
}
});
} }
@Override public ActionFuture<DeleteByQueryResponse> deleteByQuery(DeleteByQueryRequest request) { @Override public ActionFuture<DeleteByQueryResponse> deleteByQuery(final DeleteByQueryRequest request) {
return deleteByQueryAction.submit(nodesService.randomNode(), request); return nodesService.execute(new TransportClientNodesService.NodeCallback<ActionFuture<DeleteByQueryResponse>>() {
@Override public ActionFuture<DeleteByQueryResponse> doWithNode(Node node) throws ElasticSearchException {
return deleteByQueryAction.submit(node, request);
}
});
} }
@Override public ActionFuture<DeleteByQueryResponse> deleteByQuery(DeleteByQueryRequest request, ActionListener<DeleteByQueryResponse> listener) { @Override public ActionFuture<DeleteByQueryResponse> deleteByQuery(final DeleteByQueryRequest request, final ActionListener<DeleteByQueryResponse> listener) {
return deleteByQueryAction.submit(nodesService.randomNode(), request, listener); return nodesService.execute(new TransportClientNodesService.NodeCallback<ActionFuture<DeleteByQueryResponse>>() {
@Override public ActionFuture<DeleteByQueryResponse> doWithNode(Node node) throws ElasticSearchException {
return deleteByQueryAction.submit(node, request, listener);
}
});
} }
@Override public void execDeleteByQuery(DeleteByQueryRequest request, ActionListener<DeleteByQueryResponse> listener) { @Override public void execDeleteByQuery(final DeleteByQueryRequest request, final ActionListener<DeleteByQueryResponse> listener) {
deleteByQueryAction.execute(nodesService.randomNode(), request, listener); nodesService.execute(new TransportClientNodesService.NodeCallback<Void>() {
@Override public Void doWithNode(Node node) throws ElasticSearchException {
deleteByQueryAction.execute(node, request, listener);
return null;
}
});
} }
@Override public ActionFuture<GetResponse> get(GetRequest request) { @Override public ActionFuture<GetResponse> get(final GetRequest request) {
return getAction.submit(nodesService.randomNode(), request); return nodesService.execute(new TransportClientNodesService.NodeCallback<ActionFuture<GetResponse>>() {
@Override public ActionFuture<GetResponse> doWithNode(Node node) throws ElasticSearchException {
return getAction.submit(node, request);
}
});
} }
@Override public ActionFuture<GetResponse> get(GetRequest request, ActionListener<GetResponse> listener) { @Override public ActionFuture<GetResponse> get(final GetRequest request, final ActionListener<GetResponse> listener) {
return getAction.submit(nodesService.randomNode(), request, listener); return nodesService.execute(new TransportClientNodesService.NodeCallback<ActionFuture<GetResponse>>() {
@Override public ActionFuture<GetResponse> doWithNode(Node node) throws ElasticSearchException {
return getAction.submit(node, request, listener);
}
});
} }
@Override public void execGet(GetRequest request, ActionListener<GetResponse> listener) { @Override public void execGet(final GetRequest request, final ActionListener<GetResponse> listener) {
getAction.execute(nodesService.randomNode(), request, listener); nodesService.execute(new TransportClientNodesService.NodeCallback<Object>() {
@Override public Object doWithNode(Node node) throws ElasticSearchException {
getAction.execute(node, request, listener);
return null;
}
});
} }
@Override public ActionFuture<CountResponse> count(CountRequest request) { @Override public ActionFuture<CountResponse> count(final CountRequest request) {
return countAction.submit(nodesService.randomNode(), request); return nodesService.execute(new TransportClientNodesService.NodeCallback<ActionFuture<CountResponse>>() {
@Override public ActionFuture<CountResponse> doWithNode(Node node) throws ElasticSearchException {
return countAction.submit(node, request);
}
});
} }
@Override public ActionFuture<CountResponse> count(CountRequest request, ActionListener<CountResponse> listener) { @Override public ActionFuture<CountResponse> count(final CountRequest request, final ActionListener<CountResponse> listener) {
return countAction.submit(nodesService.randomNode(), request, listener); return nodesService.execute(new TransportClientNodesService.NodeCallback<ActionFuture<CountResponse>>() {
@Override public ActionFuture<CountResponse> doWithNode(Node node) throws ElasticSearchException {
return countAction.submit(node, request, listener);
}
});
} }
@Override public void execCount(CountRequest request, ActionListener<CountResponse> listener) { @Override public void execCount(final CountRequest request, final ActionListener<CountResponse> listener) {
countAction.execute(nodesService.randomNode(), request, listener); nodesService.execute(new TransportClientNodesService.NodeCallback<Void>() {
@Override public Void doWithNode(Node node) throws ElasticSearchException {
countAction.execute(node, request, listener);
return null;
}
});
} }
@Override public ActionFuture<SearchResponse> search(SearchRequest request) { @Override public ActionFuture<SearchResponse> search(final SearchRequest request) {
return searchAction.submit(nodesService.randomNode(), request); return nodesService.execute(new TransportClientNodesService.NodeCallback<ActionFuture<SearchResponse>>() {
@Override public ActionFuture<SearchResponse> doWithNode(Node node) throws ElasticSearchException {
return searchAction.submit(node, request);
}
});
} }
@Override public ActionFuture<SearchResponse> search(SearchRequest request, ActionListener<SearchResponse> listener) { @Override public ActionFuture<SearchResponse> search(final SearchRequest request, final ActionListener<SearchResponse> listener) {
return searchAction.submit(nodesService.randomNode(), request, listener); return nodesService.execute(new TransportClientNodesService.NodeCallback<ActionFuture<SearchResponse>>() {
@Override public ActionFuture<SearchResponse> doWithNode(Node node) throws ElasticSearchException {
return searchAction.submit(node, request, listener);
}
});
} }
@Override public void execSearch(SearchRequest request, ActionListener<SearchResponse> listener) { @Override public void execSearch(final SearchRequest request, final ActionListener<SearchResponse> listener) {
searchAction.execute(nodesService.randomNode(), request, listener); nodesService.execute(new TransportClientNodesService.NodeCallback<Object>() {
@Override public Object doWithNode(Node node) throws ElasticSearchException {
searchAction.execute(node, request, listener);
return null;
}
});
} }
@Override public ActionFuture<SearchResponse> searchScroll(SearchScrollRequest request) { @Override public ActionFuture<SearchResponse> searchScroll(final SearchScrollRequest request) {
return searchScrollAction.submit(nodesService.randomNode(), request); return nodesService.execute(new TransportClientNodesService.NodeCallback<ActionFuture<SearchResponse>>() {
@Override public ActionFuture<SearchResponse> doWithNode(Node node) throws ElasticSearchException {
return searchScrollAction.submit(node, request);
}
});
} }
@Override public ActionFuture<SearchResponse> searchScroll(SearchScrollRequest request, ActionListener<SearchResponse> listener) { @Override public ActionFuture<SearchResponse> searchScroll(final SearchScrollRequest request, final ActionListener<SearchResponse> listener) {
return searchScrollAction.submit(nodesService.randomNode(), request, listener); return nodesService.execute(new TransportClientNodesService.NodeCallback<ActionFuture<SearchResponse>>() {
@Override public ActionFuture<SearchResponse> doWithNode(Node node) throws ElasticSearchException {
return searchScrollAction.submit(node, request, listener);
}
});
} }
@Override public void execSearchScroll(SearchScrollRequest request, ActionListener<SearchResponse> listener) { @Override public void execSearchScroll(final SearchScrollRequest request, final ActionListener<SearchResponse> listener) {
searchScrollAction.execute(nodesService.randomNode(), request, listener); nodesService.execute(new TransportClientNodesService.NodeCallback<Object>() {
@Override public Object doWithNode(Node node) throws ElasticSearchException {
searchScrollAction.execute(node, request, listener);
return null;
}
});
} }
} }

View File

@ -20,6 +20,7 @@
package org.elasticsearch.client.transport.support; package org.elasticsearch.client.transport.support;
import com.google.inject.Inject; import com.google.inject.Inject;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
@ -39,6 +40,7 @@ import org.elasticsearch.client.transport.action.admin.cluster.ping.broadcast.Cl
import org.elasticsearch.client.transport.action.admin.cluster.ping.replication.ClientTransportReplicationPingAction; import org.elasticsearch.client.transport.action.admin.cluster.ping.replication.ClientTransportReplicationPingAction;
import org.elasticsearch.client.transport.action.admin.cluster.ping.single.ClientTransportSinglePingAction; import org.elasticsearch.client.transport.action.admin.cluster.ping.single.ClientTransportSinglePingAction;
import org.elasticsearch.client.transport.action.admin.cluster.state.ClientTransportClusterStateAction; import org.elasticsearch.client.transport.action.admin.cluster.state.ClientTransportClusterStateAction;
import org.elasticsearch.cluster.node.Node;
import org.elasticsearch.util.component.AbstractComponent; import org.elasticsearch.util.component.AbstractComponent;
import org.elasticsearch.util.settings.Settings; import org.elasticsearch.util.settings.Settings;
@ -72,63 +74,128 @@ public class InternalTransportClusterAdminClient extends AbstractComponent imple
this.broadcastPingAction = broadcastPingAction; this.broadcastPingAction = broadcastPingAction;
} }
@Override public ActionFuture<ClusterStateResponse> state(ClusterStateRequest request) { @Override public ActionFuture<ClusterStateResponse> state(final ClusterStateRequest request) {
return clusterStateAction.submit(nodesService.randomNode(), request); return nodesService.execute(new TransportClientNodesService.NodeCallback<ActionFuture<ClusterStateResponse>>() {
@Override public ActionFuture<ClusterStateResponse> doWithNode(Node node) throws ElasticSearchException {
return clusterStateAction.submit(node, request);
}
});
} }
@Override public ActionFuture<ClusterStateResponse> state(ClusterStateRequest request, ActionListener<ClusterStateResponse> listener) { @Override public ActionFuture<ClusterStateResponse> state(final ClusterStateRequest request, final ActionListener<ClusterStateResponse> listener) {
return clusterStateAction.submit(nodesService.randomNode(), request, listener); return nodesService.execute(new TransportClientNodesService.NodeCallback<ActionFuture<ClusterStateResponse>>() {
@Override public ActionFuture<ClusterStateResponse> doWithNode(Node node) throws ElasticSearchException {
return clusterStateAction.submit(node, request, listener);
}
});
} }
@Override public void execState(ClusterStateRequest request, ActionListener<ClusterStateResponse> listener) { @Override public void execState(final ClusterStateRequest request, final ActionListener<ClusterStateResponse> listener) {
clusterStateAction.execute(nodesService.randomNode(), request, listener); nodesService.execute(new TransportClientNodesService.NodeCallback<Void>() {
@Override public Void doWithNode(Node node) throws ElasticSearchException {
clusterStateAction.execute(node, request, listener);
return null;
}
});
} }
@Override public ActionFuture<SinglePingResponse> ping(SinglePingRequest request) { @Override public ActionFuture<SinglePingResponse> ping(final SinglePingRequest request) {
return singlePingAction.submit(nodesService.randomNode(), request); return nodesService.execute(new TransportClientNodesService.NodeCallback<ActionFuture<SinglePingResponse>>() {
@Override public ActionFuture<SinglePingResponse> doWithNode(Node node) throws ElasticSearchException {
return singlePingAction.submit(node, request);
}
});
} }
@Override public ActionFuture<SinglePingResponse> ping(SinglePingRequest request, ActionListener<SinglePingResponse> listener) { @Override public ActionFuture<SinglePingResponse> ping(final SinglePingRequest request, final ActionListener<SinglePingResponse> listener) {
return singlePingAction.submit(nodesService.randomNode(), request, listener); return nodesService.execute(new TransportClientNodesService.NodeCallback<ActionFuture<SinglePingResponse>>() {
@Override public ActionFuture<SinglePingResponse> doWithNode(Node node) throws ElasticSearchException {
return singlePingAction.submit(node, request, listener);
}
});
} }
@Override public void execPing(SinglePingRequest request, ActionListener<SinglePingResponse> listener) { @Override public void execPing(final SinglePingRequest request, final ActionListener<SinglePingResponse> listener) {
singlePingAction.execute(nodesService.randomNode(), request, listener); nodesService.execute(new TransportClientNodesService.NodeCallback<Void>() {
@Override public Void doWithNode(Node node) throws ElasticSearchException {
singlePingAction.execute(node, request, listener);
return null;
}
});
} }
@Override public ActionFuture<BroadcastPingResponse> ping(BroadcastPingRequest request) { @Override public ActionFuture<BroadcastPingResponse> ping(final BroadcastPingRequest request) {
return broadcastPingAction.submit(nodesService.randomNode(), request); return nodesService.execute(new TransportClientNodesService.NodeCallback<ActionFuture<BroadcastPingResponse>>() {
@Override public ActionFuture<BroadcastPingResponse> doWithNode(Node node) throws ElasticSearchException {
return broadcastPingAction.submit(node, request);
}
});
} }
@Override public ActionFuture<BroadcastPingResponse> ping(BroadcastPingRequest request, ActionListener<BroadcastPingResponse> listener) { @Override public ActionFuture<BroadcastPingResponse> ping(final BroadcastPingRequest request, final ActionListener<BroadcastPingResponse> listener) {
return broadcastPingAction.submit(nodesService.randomNode(), request, listener); return nodesService.execute(new TransportClientNodesService.NodeCallback<ActionFuture<BroadcastPingResponse>>() {
@Override public ActionFuture<BroadcastPingResponse> doWithNode(Node node) throws ElasticSearchException {
return broadcastPingAction.submit(node, request, listener);
}
});
} }
@Override public void execPing(BroadcastPingRequest request, ActionListener<BroadcastPingResponse> listener) { @Override public void execPing(final BroadcastPingRequest request, final ActionListener<BroadcastPingResponse> listener) {
broadcastPingAction.execute(nodesService.randomNode(), request, listener); nodesService.execute(new TransportClientNodesService.NodeCallback<Void>() {
@Override public Void doWithNode(Node node) throws ElasticSearchException {
broadcastPingAction.execute(node, request, listener);
return null;
}
});
} }
@Override public ActionFuture<ReplicationPingResponse> ping(ReplicationPingRequest request) { @Override public ActionFuture<ReplicationPingResponse> ping(final ReplicationPingRequest request) {
return replicationPingAction.submit(nodesService.randomNode(), request); return nodesService.execute(new TransportClientNodesService.NodeCallback<ActionFuture<ReplicationPingResponse>>() {
@Override public ActionFuture<ReplicationPingResponse> doWithNode(Node node) throws ElasticSearchException {
return replicationPingAction.submit(node, request);
}
});
} }
@Override public ActionFuture<ReplicationPingResponse> ping(ReplicationPingRequest request, ActionListener<ReplicationPingResponse> listener) { @Override public ActionFuture<ReplicationPingResponse> ping(final ReplicationPingRequest request, final ActionListener<ReplicationPingResponse> listener) {
return replicationPingAction.submit(nodesService.randomNode(), request, listener); return nodesService.execute(new TransportClientNodesService.NodeCallback<ActionFuture<ReplicationPingResponse>>() {
@Override public ActionFuture<ReplicationPingResponse> doWithNode(Node node) throws ElasticSearchException {
return replicationPingAction.submit(node, request, listener);
}
});
} }
@Override public void execPing(ReplicationPingRequest request, ActionListener<ReplicationPingResponse> listener) { @Override public void execPing(final ReplicationPingRequest request, final ActionListener<ReplicationPingResponse> listener) {
replicationPingAction.execute(nodesService.randomNode(), request, listener); nodesService.execute(new TransportClientNodesService.NodeCallback<Void>() {
@Override public Void doWithNode(Node node) throws ElasticSearchException {
replicationPingAction.execute(node, request, listener);
return null;
}
});
} }
@Override public ActionFuture<NodesInfoResponse> nodesInfo(NodesInfoRequest request) { @Override public ActionFuture<NodesInfoResponse> nodesInfo(final NodesInfoRequest request) {
return nodesInfoAction.submit(nodesService.randomNode(), request); return nodesService.execute(new TransportClientNodesService.NodeCallback<ActionFuture<NodesInfoResponse>>() {
@Override public ActionFuture<NodesInfoResponse> doWithNode(Node node) throws ElasticSearchException {
return nodesInfoAction.submit(node, request);
}
});
} }
@Override public ActionFuture<NodesInfoResponse> nodesInfo(NodesInfoRequest request, ActionListener<NodesInfoResponse> listener) { @Override public ActionFuture<NodesInfoResponse> nodesInfo(final NodesInfoRequest request, final ActionListener<NodesInfoResponse> listener) {
return nodesInfoAction.submit(nodesService.randomNode(), request, listener); return nodesService.execute(new TransportClientNodesService.NodeCallback<ActionFuture<NodesInfoResponse>>() {
@Override public ActionFuture<NodesInfoResponse> doWithNode(Node node) throws ElasticSearchException {
return nodesInfoAction.submit(node, request, listener);
}
});
} }
@Override public void execNodesInfo(NodesInfoRequest request, ActionListener<NodesInfoResponse> listener) { @Override public void execNodesInfo(final NodesInfoRequest request, final ActionListener<NodesInfoResponse> listener) {
nodesInfoAction.execute(nodesService.randomNode(), request, listener); nodesService.execute(new TransportClientNodesService.NodeCallback<Void>() {
@Override public Void doWithNode(Node node) throws ElasticSearchException {
nodesInfoAction.execute(node, request, listener);
return null;
}
});
} }
} }

View File

@ -20,6 +20,7 @@
package org.elasticsearch.client.transport.support; package org.elasticsearch.client.transport.support;
import com.google.inject.Inject; import com.google.inject.Inject;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
@ -45,6 +46,7 @@ import org.elasticsearch.client.transport.action.admin.indices.gateway.snapshot.
import org.elasticsearch.client.transport.action.admin.indices.mapping.create.ClientTransportCreateMappingAction; import org.elasticsearch.client.transport.action.admin.indices.mapping.create.ClientTransportCreateMappingAction;
import org.elasticsearch.client.transport.action.admin.indices.refresh.ClientTransportRefreshAction; import org.elasticsearch.client.transport.action.admin.indices.refresh.ClientTransportRefreshAction;
import org.elasticsearch.client.transport.action.admin.indices.status.ClientTransportIndicesStatusAction; import org.elasticsearch.client.transport.action.admin.indices.status.ClientTransportIndicesStatusAction;
import org.elasticsearch.cluster.node.Node;
import org.elasticsearch.util.component.AbstractComponent; import org.elasticsearch.util.component.AbstractComponent;
import org.elasticsearch.util.settings.Settings; import org.elasticsearch.util.settings.Settings;
@ -85,87 +87,178 @@ public class InternalTransportIndicesAdminClient extends AbstractComponent imple
this.gatewaySnapshotAction = gatewaySnapshotAction; this.gatewaySnapshotAction = gatewaySnapshotAction;
} }
@Override public ActionFuture<IndicesStatusResponse> status(IndicesStatusRequest request) { @Override public ActionFuture<IndicesStatusResponse> status(final IndicesStatusRequest request) {
return indicesStatusAction.submit(nodesService.randomNode(), request); return nodesService.execute(new TransportClientNodesService.NodeCallback<ActionFuture<IndicesStatusResponse>>() {
@Override public ActionFuture<IndicesStatusResponse> doWithNode(Node node) throws ElasticSearchException {
return indicesStatusAction.submit(node, request);
}
});
} }
@Override public ActionFuture<IndicesStatusResponse> status(IndicesStatusRequest request, ActionListener<IndicesStatusResponse> listener) { @Override public ActionFuture<IndicesStatusResponse> status(final IndicesStatusRequest request, final ActionListener<IndicesStatusResponse> listener) {
return indicesStatusAction.submit(nodesService.randomNode(), request, listener); return nodesService.execute(new TransportClientNodesService.NodeCallback<ActionFuture<IndicesStatusResponse>>() {
@Override public ActionFuture<IndicesStatusResponse> doWithNode(Node node) throws ElasticSearchException {
return indicesStatusAction.submit(node, request, listener);
}
});
} }
@Override public void execStatus(IndicesStatusRequest request, ActionListener<IndicesStatusResponse> listener) { @Override public void execStatus(final IndicesStatusRequest request, final ActionListener<IndicesStatusResponse> listener) {
indicesStatusAction.execute(nodesService.randomNode(), request, listener); nodesService.execute(new TransportClientNodesService.NodeCallback<Void>() {
@Override public Void doWithNode(Node node) throws ElasticSearchException {
indicesStatusAction.execute(node, request, listener);
return null;
}
});
} }
@Override public ActionFuture<CreateIndexResponse> create(CreateIndexRequest request) { @Override public ActionFuture<CreateIndexResponse> create(final CreateIndexRequest request) {
return createIndexAction.submit(nodesService.randomNode(), request); return nodesService.execute(new TransportClientNodesService.NodeCallback<ActionFuture<CreateIndexResponse>>() {
@Override public ActionFuture<CreateIndexResponse> doWithNode(Node node) throws ElasticSearchException {
return createIndexAction.submit(node, request);
}
});
} }
@Override public ActionFuture<CreateIndexResponse> create(CreateIndexRequest request, ActionListener<CreateIndexResponse> listener) { @Override public ActionFuture<CreateIndexResponse> create(final CreateIndexRequest request, final ActionListener<CreateIndexResponse> listener) {
return createIndexAction.submit(nodesService.randomNode(), request, listener); return nodesService.execute(new TransportClientNodesService.NodeCallback<ActionFuture<CreateIndexResponse>>() {
@Override public ActionFuture<CreateIndexResponse> doWithNode(Node node) throws ElasticSearchException {
return createIndexAction.submit(node, request, listener);
}
});
} }
@Override public void execCreate(CreateIndexRequest request, ActionListener<CreateIndexResponse> listener) { @Override public void execCreate(final CreateIndexRequest request, final ActionListener<CreateIndexResponse> listener) {
createIndexAction.execute(nodesService.randomNode(), request, listener); nodesService.execute(new TransportClientNodesService.NodeCallback<Object>() {
@Override public Object doWithNode(Node node) throws ElasticSearchException {
createIndexAction.execute(node, request, listener);
return null;
}
});
} }
@Override public ActionFuture<DeleteIndexResponse> delete(DeleteIndexRequest request) { @Override public ActionFuture<DeleteIndexResponse> delete(final DeleteIndexRequest request) {
return deleteIndexAction.submit(nodesService.randomNode(), request); return nodesService.execute(new TransportClientNodesService.NodeCallback<ActionFuture<DeleteIndexResponse>>() {
@Override public ActionFuture<DeleteIndexResponse> doWithNode(Node node) throws ElasticSearchException {
return deleteIndexAction.submit(node, request);
}
});
} }
@Override public ActionFuture<DeleteIndexResponse> delete(DeleteIndexRequest request, ActionListener<DeleteIndexResponse> listener) { @Override public ActionFuture<DeleteIndexResponse> delete(final DeleteIndexRequest request, final ActionListener<DeleteIndexResponse> listener) {
return deleteIndexAction.submit(nodesService.randomNode(), request, listener); return nodesService.execute(new TransportClientNodesService.NodeCallback<ActionFuture<DeleteIndexResponse>>() {
@Override public ActionFuture<DeleteIndexResponse> doWithNode(Node node) throws ElasticSearchException {
return deleteIndexAction.submit(node, request, listener);
}
});
} }
@Override public void execDelete(DeleteIndexRequest request, ActionListener<DeleteIndexResponse> listener) { @Override public void execDelete(final DeleteIndexRequest request, final ActionListener<DeleteIndexResponse> listener) {
deleteIndexAction.execute(nodesService.randomNode(), request, listener); nodesService.execute(new TransportClientNodesService.NodeCallback<Object>() {
@Override public Object doWithNode(Node node) throws ElasticSearchException {
deleteIndexAction.execute(node, request, listener);
return null;
}
});
} }
@Override public ActionFuture<RefreshResponse> refresh(RefreshRequest request) { @Override public ActionFuture<RefreshResponse> refresh(final RefreshRequest request) {
return refreshAction.submit(nodesService.randomNode(), request); return nodesService.execute(new TransportClientNodesService.NodeCallback<ActionFuture<RefreshResponse>>() {
@Override public ActionFuture<RefreshResponse> doWithNode(Node node) throws ElasticSearchException {
return refreshAction.submit(node, request);
}
});
} }
@Override public ActionFuture<RefreshResponse> refresh(RefreshRequest request, ActionListener<RefreshResponse> listener) { @Override public ActionFuture<RefreshResponse> refresh(final RefreshRequest request, final ActionListener<RefreshResponse> listener) {
return refreshAction.submit(nodesService.randomNode(), request, listener); return nodesService.execute(new TransportClientNodesService.NodeCallback<ActionFuture<RefreshResponse>>() {
@Override public ActionFuture<RefreshResponse> doWithNode(Node node) throws ElasticSearchException {
return refreshAction.submit(node, request, listener);
}
});
} }
@Override public void execRefresh(RefreshRequest request, ActionListener<RefreshResponse> listener) { @Override public void execRefresh(final RefreshRequest request, final ActionListener<RefreshResponse> listener) {
refreshAction.execute(nodesService.randomNode(), request, listener); nodesService.execute(new TransportClientNodesService.NodeCallback<Void>() {
@Override public Void doWithNode(Node node) throws ElasticSearchException {
refreshAction.execute(node, request, listener);
return null;
}
});
} }
@Override public ActionFuture<FlushResponse> flush(FlushRequest request) { @Override public ActionFuture<FlushResponse> flush(final FlushRequest request) {
return flushAction.submit(nodesService.randomNode(), request); return nodesService.execute(new TransportClientNodesService.NodeCallback<ActionFuture<FlushResponse>>() {
@Override public ActionFuture<FlushResponse> doWithNode(Node node) throws ElasticSearchException {
return flushAction.submit(node, request);
}
});
} }
@Override public ActionFuture<FlushResponse> flush(FlushRequest request, ActionListener<FlushResponse> listener) { @Override public ActionFuture<FlushResponse> flush(final FlushRequest request, final ActionListener<FlushResponse> listener) {
return flushAction.submit(nodesService.randomNode(), request, listener); return nodesService.execute(new TransportClientNodesService.NodeCallback<ActionFuture<FlushResponse>>() {
@Override public ActionFuture<FlushResponse> doWithNode(Node node) throws ElasticSearchException {
return flushAction.submit(node, request, listener);
}
});
} }
@Override public void execFlush(FlushRequest request, ActionListener<FlushResponse> listener) { @Override public void execFlush(final FlushRequest request, final ActionListener<FlushResponse> listener) {
flushAction.execute(nodesService.randomNode(), request, listener); nodesService.execute(new TransportClientNodesService.NodeCallback<Object>() {
@Override public Object doWithNode(Node node) throws ElasticSearchException {
flushAction.execute(node, request, listener);
return null;
}
});
} }
@Override public ActionFuture<CreateMappingResponse> createMapping(CreateMappingRequest request) { @Override public ActionFuture<CreateMappingResponse> createMapping(final CreateMappingRequest request) {
return createMappingAction.submit(nodesService.randomNode(), request); return nodesService.execute(new TransportClientNodesService.NodeCallback<ActionFuture<CreateMappingResponse>>() {
@Override public ActionFuture<CreateMappingResponse> doWithNode(Node node) throws ElasticSearchException {
return createMappingAction.submit(node, request);
}
});
} }
@Override public ActionFuture<CreateMappingResponse> createMapping(CreateMappingRequest request, ActionListener<CreateMappingResponse> listener) { @Override public ActionFuture<CreateMappingResponse> createMapping(final CreateMappingRequest request, final ActionListener<CreateMappingResponse> listener) {
return createMappingAction.submit(nodesService.randomNode(), request, listener); return nodesService.execute(new TransportClientNodesService.NodeCallback<ActionFuture<CreateMappingResponse>>() {
@Override public ActionFuture<CreateMappingResponse> doWithNode(Node node) throws ElasticSearchException {
return createMappingAction.submit(node, request, listener);
}
});
} }
@Override public void execCreateMapping(CreateMappingRequest request, ActionListener<CreateMappingResponse> listener) { @Override public void execCreateMapping(final CreateMappingRequest request, final ActionListener<CreateMappingResponse> listener) {
createMappingAction.execute(nodesService.randomNode(), request, listener); nodesService.execute(new TransportClientNodesService.NodeCallback<Void>() {
@Override public Void doWithNode(Node node) throws ElasticSearchException {
createMappingAction.execute(node, request, listener);
return null;
}
});
} }
@Override public ActionFuture<GatewaySnapshotResponse> gatewaySnapshot(GatewaySnapshotRequest request) { @Override public ActionFuture<GatewaySnapshotResponse> gatewaySnapshot(final GatewaySnapshotRequest request) {
return gatewaySnapshotAction.submit(nodesService.randomNode(), request); return nodesService.execute(new TransportClientNodesService.NodeCallback<ActionFuture<GatewaySnapshotResponse>>() {
@Override public ActionFuture<GatewaySnapshotResponse> doWithNode(Node node) throws ElasticSearchException {
return gatewaySnapshotAction.submit(node, request);
}
});
} }
@Override public ActionFuture<GatewaySnapshotResponse> gatewaySnapshot(GatewaySnapshotRequest request, ActionListener<GatewaySnapshotResponse> listener) { @Override public ActionFuture<GatewaySnapshotResponse> gatewaySnapshot(final GatewaySnapshotRequest request, final ActionListener<GatewaySnapshotResponse> listener) {
return gatewaySnapshotAction.submit(nodesService.randomNode(), request, listener); return nodesService.execute(new TransportClientNodesService.NodeCallback<ActionFuture<GatewaySnapshotResponse>>() {
@Override public ActionFuture<GatewaySnapshotResponse> doWithNode(Node node) throws ElasticSearchException {
return gatewaySnapshotAction.submit(node, request, listener);
}
});
} }
@Override public void execGatewaySnapshot(GatewaySnapshotRequest request, ActionListener<GatewaySnapshotResponse> listener) { @Override public void execGatewaySnapshot(final GatewaySnapshotRequest request, final ActionListener<GatewaySnapshotResponse> listener) {
gatewaySnapshotAction.execute(nodesService.randomNode(), request, listener); nodesService.execute(new TransportClientNodesService.NodeCallback<Object>() {
@Override public Object doWithNode(Node node) throws ElasticSearchException {
gatewaySnapshotAction.execute(node, request, listener);
return null;
}
});
} }
} }

View File

@ -29,10 +29,10 @@ import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests; import org.elasticsearch.client.Requests;
import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.server.internal.InternalServer; import org.elasticsearch.server.internal.InternalServer;
import org.elasticsearch.test.integration.AbstractServersTests; import org.elasticsearch.test.integration.AbstractServersTests;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import org.elasticsearch.util.settings.ImmutableSettings; import org.elasticsearch.util.settings.ImmutableSettings;
import org.elasticsearch.util.transport.TransportAddress; import org.elasticsearch.util.transport.TransportAddress;
@ -163,7 +163,7 @@ public class SimpleSingleTransportClientTests extends AbstractServersTests {
try { try {
client.index(Requests.indexRequest("test").type("type1").id("1").source(source("1", "test"))).actionGet(); client.index(Requests.indexRequest("test").type("type1").id("1").source(source("1", "test"))).actionGet();
assert false : "should fail..."; assert false : "should fail...";
} catch (ConnectTransportException e) { } catch (NoNodeAvailableException e) {
// all is well // all is well
} }
} }