zen discovery, support for unicast discovery

This commit is contained in:
kimchy 2010-04-25 12:01:11 +03:00
parent cb0d7d4735
commit 453ede8f57
13 changed files with 395 additions and 32 deletions

View File

@ -73,6 +73,7 @@
<w>traslog</w>
<w>trie</w>
<w>tuple</w>
<w>unicast</w>
<w>unregister</w>
<w>uuid</w>
<w>versioned</w>

View File

@ -25,6 +25,7 @@ import com.google.common.collect.UnmodifiableIterator;
import org.elasticsearch.util.Nullable;
import org.elasticsearch.util.io.stream.StreamInput;
import org.elasticsearch.util.io.stream.StreamOutput;
import org.elasticsearch.util.transport.TransportAddress;
import java.io.IOException;
import java.util.List;
@ -143,6 +144,15 @@ public class DiscoveryNodes implements Iterable<DiscoveryNode> {
return masterNode();
}
public DiscoveryNode findByAddress(TransportAddress address) {
for (DiscoveryNode node : nodes.values()) {
if (node.address().equals(address)) {
return node;
}
}
return null;
}
public DiscoveryNodes removeDeadMembers(Set<String> newNodes, String masterNodeId) {
Builder builder = new Builder().masterNodeId(masterNodeId).localNodeId(localNodeId);
for (DiscoveryNode node : this) {

View File

@ -120,7 +120,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
boolean retry = true;
while (retry) {
retry = false;
DiscoveryNode masterNode = pingTillMasterResolved();
DiscoveryNode masterNode = broadBingTillMasterResolved();
if (localNode.equals(masterNode)) {
// we are the master (first)
this.firstMaster = true;
@ -182,7 +182,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
logger.debug("Failed to send leave request to master [{}]", e, latestDiscoNodes.masterNode());
}
} else {
DiscoveryNode[] possibleMasters = electMaster.nextPossibleMasters(latestDiscoNodes.nodes().values(), 3);
DiscoveryNode[] possibleMasters = electMaster.nextPossibleMasters(latestDiscoNodes.nodes().values(), 5);
for (DiscoveryNode possibleMaster : possibleMasters) {
if (localNode.equals(possibleMaster)) {
continue;
@ -365,7 +365,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
}
}
private DiscoveryNode pingTillMasterResolved() {
private DiscoveryNode broadBingTillMasterResolved() {
while (true) {
ZenPing.PingResponse[] pingResponses = pingService.pingAndWait(initialPingTimeout);
List<DiscoveryNode> pingMasters = newArrayList();

View File

@ -20,6 +20,7 @@
package org.elasticsearch.discovery.zen.ping;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
import org.elasticsearch.util.TimeValue;
@ -30,6 +31,7 @@ import org.elasticsearch.util.io.stream.Streamable;
import java.io.IOException;
import static org.elasticsearch.cluster.ClusterName.*;
import static org.elasticsearch.cluster.node.DiscoveryNode.*;
/**
@ -48,16 +50,23 @@ public interface ZenPing extends LifecycleComponent<ZenPing> {
public class PingResponse implements Streamable {
private ClusterName clusterName;
private DiscoveryNode target;
private DiscoveryNode master;
public PingResponse() {
private PingResponse() {
}
public PingResponse(DiscoveryNode target, DiscoveryNode master) {
public PingResponse(DiscoveryNode target, DiscoveryNode master, ClusterName clusterName) {
this.target = target;
this.master = master;
this.clusterName = clusterName;
}
public ClusterName clusterName() {
return this.clusterName;
}
public DiscoveryNode target() {
@ -68,7 +77,14 @@ public interface ZenPing extends LifecycleComponent<ZenPing> {
return master;
}
public static PingResponse readPingResponse(StreamInput in) throws IOException {
PingResponse response = new PingResponse();
response.readFrom(in);
return response;
}
@Override public void readFrom(StreamInput in) throws IOException {
clusterName = readClusterName(in);
target = readNode(in);
if (in.readBoolean()) {
master = readNode(in);
@ -76,6 +92,7 @@ public interface ZenPing extends LifecycleComponent<ZenPing> {
}
@Override public void writeTo(StreamOutput out) throws IOException {
clusterName.writeTo(out);
target.writeTo(out);
if (master == null) {
out.writeBoolean(false);

View File

@ -27,6 +27,7 @@ import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
import org.elasticsearch.discovery.zen.ping.multicast.MulticastZenPing;
import org.elasticsearch.discovery.zen.ping.unicast.UnicastZenPing;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.util.TimeValue;
@ -49,7 +50,15 @@ public class ZenPingService extends AbstractLifecycleComponent<ZenPing> implemen
@Inject public ZenPingService(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName) {
super(settings);
this.zenPings = ImmutableList.of(new MulticastZenPing(settings, threadPool, transportService, clusterName));
ImmutableList.Builder<ZenPing> zenPingsBuilder = ImmutableList.builder();
if (componentSettings.getAsBoolean("multicast.enabled", true)) {
zenPingsBuilder.add(new MulticastZenPing(settings, threadPool, transportService, clusterName));
}
if (componentSettings.getAsArray("unicast.hosts").length > 0) {
zenPingsBuilder.add(new UnicastZenPing(settings, threadPool, transportService, clusterName));
}
this.zenPings = zenPingsBuilder.build();
}
public ImmutableList<? extends ZenPing> zenPings() {

View File

@ -109,7 +109,7 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
this.bufferSize = componentSettings.getAsInt("buffer_size", 2048);
this.ttl = componentSettings.getAsInt("ttl", 3);
this.transportService.registerHandler(PingResponseRequestHandler.ACTION, new PingResponseRequestHandler());
this.transportService.registerHandler(MulticastPingResponseRequestHandler.ACTION, new MulticastPingResponseRequestHandler());
}
@Override public void setNodesProvider(DiscoveryNodesProvider nodesProvider) {
@ -197,7 +197,7 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
ConcurrentMap<DiscoveryNode, PingResponse> responses = receivedResponses.remove(id);
listener.onPing(responses.values().toArray(new PingResponse[responses.size()]));
}
}, timeout.millis(), TimeUnit.MILLISECONDS);
}, timeout);
}
private void sendPingRequest(int id) {
@ -224,15 +224,15 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
}
}
private class PingResponseRequestHandler extends BaseTransportRequestHandler<WrappedPingResponse> {
class MulticastPingResponseRequestHandler extends BaseTransportRequestHandler<MulticastPingResponse> {
static final String ACTION = "discovery/zen/multicast";
@Override public WrappedPingResponse newInstance() {
return new WrappedPingResponse();
@Override public MulticastPingResponse newInstance() {
return new MulticastPingResponse();
}
@Override public void messageReceived(WrappedPingResponse request, TransportChannel channel) throws Exception {
@Override public void messageReceived(MulticastPingResponse request, TransportChannel channel) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("[{}] Received {}", request.id, request.pingResponse);
}
@ -246,24 +246,18 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
}
}
class WrappedPingResponse implements Streamable {
static class MulticastPingResponse implements Streamable {
int id;
PingResponse pingResponse;
WrappedPingResponse() {
}
WrappedPingResponse(int id, PingResponse pingResponse) {
this.id = id;
this.pingResponse = pingResponse;
MulticastPingResponse() {
}
@Override public void readFrom(StreamInput in) throws IOException {
id = in.readInt();
pingResponse = new PingResponse();
pingResponse.readFrom(in);
pingResponse = PingResponse.readPingResponse(in);
}
@Override public void writeTo(StreamOutput out) throws IOException {
@ -318,12 +312,12 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
// not our cluster, ignore it...
continue;
}
final WrappedPingResponse wrappedPingResponse = new WrappedPingResponse();
wrappedPingResponse.id = id;
wrappedPingResponse.pingResponse = new PingResponse(discoveryNodes.localNode(), discoveryNodes.masterNode());
final MulticastPingResponse multicastPingResponse = new MulticastPingResponse();
multicastPingResponse.id = id;
multicastPingResponse.pingResponse = new PingResponse(discoveryNodes.localNode(), discoveryNodes.masterNode(), clusterName);
if (logger.isTraceEnabled()) {
logger.trace("[{}] Received ping_request from [{}], sending {}", id, requestingNode, wrappedPingResponse.pingResponse);
logger.trace("[{}] Received ping_request from [{}], sending {}", id, requestingNode, multicastPingResponse.pingResponse);
}
if (!transportService.nodeConnected(requestingNode)) {
@ -336,7 +330,7 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
} catch (Exception e) {
logger.warn("Failed to connect to requesting node {}", e, requestingNode);
}
transportService.sendRequest(requestingNode, PingResponseRequestHandler.ACTION, wrappedPingResponse, new VoidTransportResponseHandler(false) {
transportService.sendRequest(requestingNode, MulticastPingResponseRequestHandler.ACTION, multicastPingResponse, new VoidTransportResponseHandler(false) {
@Override public void handleException(RemoteTransportException exp) {
logger.warn("Failed to receive confirmation on sent ping response to [{}]", exp, requestingNode);
}
@ -344,7 +338,7 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
}
});
} else {
transportService.sendRequest(requestingNode, PingResponseRequestHandler.ACTION, wrappedPingResponse, new VoidTransportResponseHandler(false) {
transportService.sendRequest(requestingNode, MulticastPingResponseRequestHandler.ACTION, multicastPingResponse, new VoidTransportResponseHandler(false) {
@Override public void handleException(RemoteTransportException exp) {
logger.warn("Failed to receive confirmation on sent ping response to [{}]", exp, requestingNode);
}

View File

@ -0,0 +1,298 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.discovery.zen.ping.unicast;
import com.google.common.collect.ImmutableList;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
import org.elasticsearch.discovery.zen.ping.ZenPing;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import org.elasticsearch.util.TimeValue;
import org.elasticsearch.util.component.AbstractLifecycleComponent;
import org.elasticsearch.util.concurrent.jsr166y.LinkedTransferQueue;
import org.elasticsearch.util.io.stream.StreamInput;
import org.elasticsearch.util.io.stream.StreamOutput;
import org.elasticsearch.util.io.stream.Streamable;
import org.elasticsearch.util.settings.Settings;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static com.google.common.collect.Lists.*;
import static org.elasticsearch.discovery.zen.ping.ZenPing.PingResponse.*;
import static org.elasticsearch.util.TimeValue.*;
import static org.elasticsearch.util.concurrent.ConcurrentMaps.*;
/**
* @author kimchy (shay.banon)
*/
public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implements ZenPing {
private final ThreadPool threadPool;
private final TransportService transportService;
private final ClusterName clusterName;
private final String[] hosts;
private final DiscoveryNode[] nodes;
private volatile DiscoveryNodesProvider nodesProvider;
private final AtomicInteger pingIdGenerator = new AtomicInteger();
private final Map<Integer, ConcurrentMap<DiscoveryNode, PingResponse>> receivedResponses = newConcurrentMap();
// a list of temporal responses a node will return for a request (holds requests from other nodes)
private final Queue<PingResponse> temporalResponses = new LinkedTransferQueue<PingResponse>();
public UnicastZenPing(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName) {
super(settings);
this.threadPool = threadPool;
this.transportService = transportService;
this.clusterName = clusterName;
this.hosts = componentSettings.getAsArray("hosts");
this.nodes = new DiscoveryNode[hosts.length];
for (int i = 0; i < hosts.length; i++) {
try {
nodes[i] = new DiscoveryNode("#zen_unicast_" + i + "#", transportService.addressFromString(hosts[i]));
} catch (Exception e) {
throw new ElasticSearchIllegalArgumentException("Failed to resolve address for [" + hosts[i] + "]", e);
}
}
transportService.registerHandler(UnicastPingRequestHandler.ACTION, new UnicastPingRequestHandler());
}
@Override protected void doStart() throws ElasticSearchException {
}
@Override protected void doStop() throws ElasticSearchException {
}
@Override protected void doClose() throws ElasticSearchException {
transportService.removeHandler(UnicastPingRequestHandler.ACTION);
}
protected List<DiscoveryNode> buildDynamicNodes() {
return ImmutableList.of();
}
@Override public void setNodesProvider(DiscoveryNodesProvider nodesProvider) {
this.nodesProvider = nodesProvider;
}
@Override public void ping(final PingListener listener, final TimeValue timeout) throws ElasticSearchException {
final int id = pingIdGenerator.incrementAndGet();
receivedResponses.put(id, new ConcurrentHashMap<DiscoveryNode, PingResponse>());
sendPings(id, timeout, false);
threadPool.schedule(new Runnable() {
@Override public void run() {
sendPings(id, timeout, true);
ConcurrentMap<DiscoveryNode, PingResponse> responses = receivedResponses.remove(id);
listener.onPing(responses.values().toArray(new PingResponse[responses.size()]));
}
}, timeout);
}
private void sendPings(int id, TimeValue timeout, boolean wait) {
UnicastPingRequest pingRequest = new UnicastPingRequest();
pingRequest.id = id;
pingRequest.timeout = timeout;
DiscoveryNodes discoNodes = nodesProvider.nodes();
pingRequest.pingResponse = new PingResponse(discoNodes.localNode(), discoNodes.masterNode(), clusterName);
List<DiscoveryNode> nodesToPing = newArrayList(nodes);
nodesToPing.addAll(buildDynamicNodes());
final CountDownLatch latch = new CountDownLatch(nodesToPing.size());
for (final DiscoveryNode node : nodesToPing) {
// make sure we are connected
boolean disconnectX;
DiscoveryNode nodeToSendX = discoNodes.findByAddress(node.address());
if (nodeToSendX != null) {
disconnectX = false;
} else {
nodeToSendX = node;
disconnectX = true;
}
final DiscoveryNode nodeToSend = nodeToSendX;
try {
transportService.connectToNode(nodeToSend);
} catch (ConnectTransportException e) {
latch.countDown();
// can't connect to the node
continue;
}
final boolean disconnect = disconnectX;
transportService.sendRequest(nodeToSend, UnicastPingRequestHandler.ACTION, pingRequest, TimeValue.timeValueMillis((long) (timeout.millis() * 1.25)), new BaseTransportResponseHandler<UnicastPingResponse>() {
@Override public UnicastPingResponse newInstance() {
return new UnicastPingResponse();
}
@Override public void handleResponse(UnicastPingResponse response) {
try {
DiscoveryNodes discoveryNodes = nodesProvider.nodes();
for (PingResponse pingResponse : response.pingResponses) {
if (disconnect) {
transportService.disconnectFromNode(nodeToSend);
}
if (pingResponse.target().id().equals(discoveryNodes.localNodeId())) {
// that's us, ignore
continue;
}
if (!pingResponse.clusterName().equals(clusterName)) {
// not part of the cluster
return;
}
ConcurrentMap<DiscoveryNode, PingResponse> responses = receivedResponses.get(response.id);
if (responses == null) {
logger.warn("Received ping response with no matching id [{}]", response.id);
} else {
responses.put(pingResponse.target(), pingResponse);
}
}
} finally {
latch.countDown();
}
}
@Override public void handleException(RemoteTransportException exp) {
latch.countDown();
if (exp instanceof ConnectTransportException) {
// ok, not connected...
} else {
if (disconnect) {
transportService.disconnectFromNode(nodeToSend);
}
logger.warn("Failed to send ping to [{}]", exp, node);
}
}
});
}
if (wait) {
try {
latch.await(timeout.millis() * 5, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
// ignore
}
}
}
private UnicastPingResponse handlePingRequest(final UnicastPingRequest request) {
temporalResponses.add(request.pingResponse);
threadPool.schedule(new Runnable() {
@Override public void run() {
temporalResponses.remove(request.pingResponse);
}
}, request.timeout.millis() * 2, TimeUnit.MILLISECONDS);
List<PingResponse> pingResponses = newArrayList(temporalResponses);
DiscoveryNodes discoNodes = nodesProvider.nodes();
pingResponses.add(new PingResponse(discoNodes.localNode(), discoNodes.masterNode(), clusterName));
UnicastPingResponse unicastPingResponse = new UnicastPingResponse();
unicastPingResponse.id = request.id;
unicastPingResponse.pingResponses = pingResponses.toArray(new PingResponse[pingResponses.size()]);
return unicastPingResponse;
}
class UnicastPingRequestHandler extends BaseTransportRequestHandler<UnicastPingRequest> {
static final String ACTION = "discovery/zen/unicast";
@Override public UnicastPingRequest newInstance() {
return new UnicastPingRequest();
}
@Override public void messageReceived(UnicastPingRequest request, TransportChannel channel) throws Exception {
channel.sendResponse(handlePingRequest(request));
}
}
static class UnicastPingRequest implements Streamable {
int id;
TimeValue timeout;
PingResponse pingResponse;
UnicastPingRequest() {
}
@Override public void readFrom(StreamInput in) throws IOException {
id = in.readInt();
timeout = readTimeValue(in);
pingResponse = readPingResponse(in);
}
@Override public void writeTo(StreamOutput out) throws IOException {
out.writeInt(id);
timeout.writeTo(out);
pingResponse.writeTo(out);
}
}
static class UnicastPingResponse implements Streamable {
int id;
PingResponse[] pingResponses;
UnicastPingResponse() {
}
@Override public void readFrom(StreamInput in) throws IOException {
id = in.readInt();
pingResponses = new PingResponse[in.readVInt()];
for (int i = 0; i < pingResponses.length; i++) {
pingResponses[i] = readPingResponse(in);
}
}
@Override public void writeTo(StreamOutput out) throws IOException {
out.writeInt(id);
out.writeVInt(pingResponses.length);
for (PingResponse pingResponse : pingResponses) {
pingResponse.writeTo(out);
}
}
}
}

View File

@ -24,16 +24,24 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
/**
* @author kimchy (shay.banon)
*/
public class ConnectTransportException extends TransportException {
public class ConnectTransportException extends RemoteTransportException {
private final DiscoveryNode node;
public ConnectTransportException(DiscoveryNode node, String msg) {
this(node, msg, null);
this(node, msg, null, null);
}
public ConnectTransportException(DiscoveryNode node, String msg, String action) {
this(node, msg, action, null);
}
public ConnectTransportException(DiscoveryNode node, String msg, Throwable cause) {
super(node + ": " + msg, cause);
this(node, msg, null, cause);
}
public ConnectTransportException(DiscoveryNode node, String msg, String action, Throwable cause) {
super(node.name(), node.address(), action, cause);
this.node = node;
}

View File

@ -24,10 +24,10 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
/**
* @author kimchy (shay.banon)
*/
public class NodeDisconnectedTransportException extends RemoteTransportException {
public class NodeDisconnectedTransportException extends ConnectTransportException {
public NodeDisconnectedTransportException(DiscoveryNode node, String action) {
super(node.name(), node.address(), action, null);
super(node, "disconnected", action, null);
}
// @Override public Throwable fillInStackTrace() {

View File

@ -63,8 +63,16 @@ public interface Transport extends LifecycleComponent<Transport> {
void transportServiceAdapter(TransportServiceAdapter service);
/**
* The address the transport is bound on.
*/
BoundTransportAddress boundAddress();
/**
* Returns an address from its string representation.
*/
TransportAddress addressFromString(String address) throws Exception;
/**
* Is the address type supported.
*/

View File

@ -180,6 +180,10 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
return requestIds.getAndIncrement();
}
public TransportAddress addressFromString(String address) throws Exception {
return transport.addressFromString(address);
}
public void registerHandler(ActionTransportRequestHandler handler) {
registerHandler(handler.action(), handler);
}

View File

@ -70,6 +70,10 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
this.threadPool = threadPool;
}
@Override public TransportAddress addressFromString(String address) {
return new LocalTransportAddress(address);
}
@Override public boolean addressSupported(Class<? extends TransportAddress> address) {
return LocalTransportAddress.class.equals(address);
}

View File

@ -314,6 +314,16 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
@Override protected void doClose() throws ElasticSearchException {
}
@Override public TransportAddress addressFromString(String address) throws Exception {
int index = address.lastIndexOf(':');
if (index == -1) {
throw new ElasticSearchIllegalStateException("Port must be provided to create inet address from [" + address + "]");
}
String host = address.substring(0, index);
int port = Integer.parseInt(address.substring(index + 1));
return new InetSocketTransportAddress(host, port);
}
@Override public boolean addressSupported(Class<? extends TransportAddress> address) {
return InetSocketTransportAddress.class.equals(address);
}