[CLIENT] Add internal liveness action

This commit adds a very lightweight action to the transport
serivce that allows to fetch clustername and the discovery node
from a node. This is used by transport clients to test liveness of
a node without using the nodesinfo API which can be blocking if management
threadpools are busy.

Closes #8763
This commit is contained in:
Simon Willnauer 2014-12-04 10:49:20 +01:00
parent ab001cf8e3
commit ab0e3a6db2
8 changed files with 182 additions and 20 deletions

View File

@ -27,6 +27,7 @@ import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsAct
import org.elasticsearch.action.admin.cluster.node.hotthreads.TransportNodesHotThreadsAction; import org.elasticsearch.action.admin.cluster.node.hotthreads.TransportNodesHotThreadsAction;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoAction; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoAction;
import org.elasticsearch.action.admin.cluster.node.info.TransportNodesInfoAction; import org.elasticsearch.action.admin.cluster.node.info.TransportNodesInfoAction;
import org.elasticsearch.action.admin.cluster.node.liveness.TransportLivenessAction;
import org.elasticsearch.action.admin.cluster.node.restart.NodesRestartAction; import org.elasticsearch.action.admin.cluster.node.restart.NodesRestartAction;
import org.elasticsearch.action.admin.cluster.node.restart.TransportNodesRestartAction; import org.elasticsearch.action.admin.cluster.node.restart.TransportNodesRestartAction;
import org.elasticsearch.action.admin.cluster.node.shutdown.NodesShutdownAction; import org.elasticsearch.action.admin.cluster.node.shutdown.NodesShutdownAction;
@ -224,7 +225,6 @@ public class ActionModule extends AbstractModule {
actionFilterMultibinder.addBinding().to(actionFilter); actionFilterMultibinder.addBinding().to(actionFilter);
} }
bind(ActionFilters.class).asEagerSingleton(); bind(ActionFilters.class).asEagerSingleton();
registerAction(NodesInfoAction.INSTANCE, TransportNodesInfoAction.class); registerAction(NodesInfoAction.INSTANCE, TransportNodesInfoAction.class);
registerAction(NodesStatsAction.INSTANCE, TransportNodesStatsAction.class); registerAction(NodesStatsAction.INSTANCE, TransportNodesStatsAction.class);
registerAction(NodesShutdownAction.INSTANCE, TransportNodesShutdownAction.class); registerAction(NodesShutdownAction.INSTANCE, TransportNodesShutdownAction.class);
@ -332,10 +332,10 @@ public class ActionModule extends AbstractModule {
for (Map.Entry<String, ActionEntry> entry : actions.entrySet()) { for (Map.Entry<String, ActionEntry> entry : actions.entrySet()) {
actionsBinder.addBinding(entry.getKey()).toInstance(entry.getValue().action); actionsBinder.addBinding(entry.getKey()).toInstance(entry.getValue().action);
} }
// register GenericAction -> transportAction Map that can be injected to instances. // register GenericAction -> transportAction Map that can be injected to instances.
// also register any supporting classes // also register any supporting classes
if (!proxy) { if (!proxy) {
bind(TransportLivenessAction.class).asEagerSingleton();
MapBinder<GenericAction, TransportAction> transportActionsBinder MapBinder<GenericAction, TransportAction> transportActionsBinder
= MapBinder.newMapBinder(binder(), GenericAction.class, TransportAction.class); = MapBinder.newMapBinder(binder(), GenericAction.class, TransportAction.class);
for (Map.Entry<String, ActionEntry> entry : actions.entrySet()) { for (Map.Entry<String, ActionEntry> entry : actions.entrySet()) {

View File

@ -0,0 +1,33 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.node.liveness;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
/**
* Transport level private response for the transport handler registered under
* {@value org.elasticsearch.action.admin.cluster.node.liveness.TransportLivenessAction#NAME}
*/
public final class LivenessRequest extends ActionRequest<LivenessRequest> {
@Override
public ActionRequestValidationException validate() {
return null;
}
}

View File

@ -0,0 +1,72 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.node.liveness;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
/**
* Transport level private response for the transport handler registered under
* {@value org.elasticsearch.action.admin.cluster.node.liveness.TransportLivenessAction#NAME}
*/
public final class LivenessResponse extends ActionResponse {
private DiscoveryNode node;
private ClusterName clusterName;
public LivenessResponse() {
}
public LivenessResponse(ClusterName clusterName, DiscoveryNode node) {
this.node = node;
this.clusterName = clusterName;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
clusterName = ClusterName.readClusterName(in);
if (in.readBoolean()) {
node = DiscoveryNode.readNode(in);
} else {
node = null;
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
clusterName.writeTo(out);
out.writeOptionalStreamable(node);
}
public ClusterName getClusterName() {
return clusterName;
}
public DiscoveryNode getDiscoveryNode() {
return node;
}
}

View File

@ -0,0 +1,57 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.node.liveness;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
public final class TransportLivenessAction extends BaseTransportRequestHandler<LivenessRequest> {
private final ClusterService clusterService;
private final ClusterName clusterName;
public static final String NAME = "cluster:monitor/nodes/liveness";
@Inject
public TransportLivenessAction(ClusterName clusterName,
ClusterService clusterService, TransportService transportService) {
this.clusterService = clusterService;
this.clusterName = clusterName;
transportService.registerHandler(NAME, this);
}
@Override
public LivenessRequest newInstance() {
return new LivenessRequest();
}
@Override
public void messageReceived(LivenessRequest request, TransportChannel channel) throws Exception {
channel.sendResponse(new LivenessResponse(clusterName, clusterService.localNode()));
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
}

View File

@ -28,8 +28,9 @@ import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoAction; import org.elasticsearch.action.admin.cluster.node.liveness.LivenessRequest;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; import org.elasticsearch.action.admin.cluster.node.liveness.LivenessResponse;
import org.elasticsearch.action.admin.cluster.node.liveness.TransportLivenessAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction; import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.client.Requests; import org.elasticsearch.client.Requests;
@ -347,21 +348,21 @@ public class TransportClientNodesService extends AbstractComponent {
} }
} }
try { try {
NodesInfoResponse nodeInfo = transportService.submitRequest(listedNode, NodesInfoAction.NAME, LivenessResponse livenessResponse = transportService.submitRequest(listedNode, TransportLivenessAction.NAME,
headers.applyTo(Requests.nodesInfoRequest("_local").clear()), headers.applyTo(new LivenessRequest()),
TransportRequestOptions.options().withType(TransportRequestOptions.Type.STATE).withTimeout(pingTimeout), TransportRequestOptions.options().withType(TransportRequestOptions.Type.STATE).withTimeout(pingTimeout),
new FutureTransportResponseHandler<NodesInfoResponse>() { new FutureTransportResponseHandler<LivenessResponse>() {
@Override @Override
public NodesInfoResponse newInstance() { public LivenessResponse newInstance() {
return new NodesInfoResponse(); return new LivenessResponse();
} }
}).txGet(); }).txGet();
if (!ignoreClusterName && !clusterName.equals(nodeInfo.getClusterName())) { if (!ignoreClusterName && !clusterName.equals(livenessResponse.getClusterName())) {
logger.warn("node {} not part of the cluster {}, ignoring...", listedNode, clusterName); logger.warn("node {} not part of the cluster {}, ignoring...", listedNode, clusterName);
newFilteredNodes.add(listedNode); newFilteredNodes.add(listedNode);
} else if (nodeInfo.getNodes().length != 0) { } else if (livenessResponse.getDiscoveryNode() != null) {
// use discovered information but do keep the original transport address, so people can control which address is exactly used. // use discovered information but do keep the original transport address, so people can control which address is exactly used.
DiscoveryNode nodeWithInfo = nodeInfo.getNodes()[0].getNode(); DiscoveryNode nodeWithInfo = livenessResponse.getDiscoveryNode();
newNodes.add(new DiscoveryNode(nodeWithInfo.name(), nodeWithInfo.id(), nodeWithInfo.getHostName(), nodeWithInfo.getHostAddress(), listedNode.address(), nodeWithInfo.attributes(), nodeWithInfo.version())); newNodes.add(new DiscoveryNode(nodeWithInfo.name(), nodeWithInfo.id(), nodeWithInfo.getHostName(), nodeWithInfo.getHostAddress(), listedNode.address(), nodeWithInfo.attributes(), nodeWithInfo.version()));
} else { } else {
// although we asked for one node, our target may not have completed initialization yet and doesn't have cluster nodes // although we asked for one node, our target may not have completed initialization yet and doesn't have cluster nodes

View File

@ -19,6 +19,7 @@
package org.elasticsearch.transport; package org.elasticsearch.transport;
import org.elasticsearch.action.admin.cluster.node.liveness.TransportLivenessAction;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLogger;

View File

@ -25,6 +25,7 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.action.admin.cluster.node.liveness.LivenessResponse;
import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.Lifecycle; import org.elasticsearch.common.component.Lifecycle;
@ -63,9 +64,7 @@ abstract class FailAndRetryMockTransport<Response extends TransportResponse> imp
//we make sure that nodes get added to the connected ones when calling addTransportAddress, by returning proper nodes info //we make sure that nodes get added to the connected ones when calling addTransportAddress, by returning proper nodes info
if (connectMode) { if (connectMode) {
TransportResponseHandler transportResponseHandler = transportServiceAdapter.remove(requestId); TransportResponseHandler transportResponseHandler = transportServiceAdapter.remove(requestId);
NodeInfo nodeInfo = new NodeInfo(Version.CURRENT, Build.CURRENT, node, null, null, null, null, null, null, null, null, null, null); transportResponseHandler.handleResponse(new LivenessResponse(ClusterName.DEFAULT, node));
NodesInfoResponse nodesInfoResponse = new NodesInfoResponse(ClusterName.DEFAULT, new NodeInfo[]{nodeInfo});
transportResponseHandler.handleResponse(nodesInfoResponse);
return; return;
} }

View File

@ -21,9 +21,8 @@ package org.elasticsearch.client.transport;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.action.GenericAction; import org.elasticsearch.action.GenericAction;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; import org.elasticsearch.action.admin.cluster.node.liveness.LivenessResponse;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoAction; import org.elasticsearch.action.admin.cluster.node.liveness.TransportLivenessAction;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction; import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.client.AbstractClientHeadersTests; import org.elasticsearch.client.AbstractClientHeadersTests;
@ -105,9 +104,9 @@ public class TransportClientHeadersTests extends AbstractClientHeadersTests {
@Override @SuppressWarnings("unchecked") @Override @SuppressWarnings("unchecked")
public <T extends TransportResponse> void sendRequest(DiscoveryNode node, String action, TransportRequest request, TransportRequestOptions options, TransportResponseHandler<T> handler) { public <T extends TransportResponse> void sendRequest(DiscoveryNode node, String action, TransportRequest request, TransportRequestOptions options, TransportResponseHandler<T> handler) {
if (NodesInfoAction.NAME.equals(action)) { if (TransportLivenessAction.NAME.equals(action)) {
assertHeaders(request); assertHeaders(request);
((TransportResponseHandler<NodesInfoResponse>) handler).handleResponse(new NodesInfoResponse(ClusterName.DEFAULT, new NodeInfo[0])); ((TransportResponseHandler<LivenessResponse>) handler).handleResponse(new LivenessResponse(ClusterName.DEFAULT, node));
return; return;
} }
if (ClusterStateAction.NAME.equals(action)) { if (ClusterStateAction.NAME.equals(action)) {