mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-22 12:56:53 +00:00
better handling of failed transport address passed in discovery (still requires more work)
This commit is contained in:
parent
cc9ff90199
commit
bcb46a1747
@ -173,7 +173,7 @@ public class JgroupsDiscovery extends AbstractLifecycleComponent<Discovery> impl
|
||||
try {
|
||||
channel.send(new Message(channel.getView().getCreator(), channel.getAddress(), nodeMessagePayload()));
|
||||
addressSet = true;
|
||||
logger.debug("Sent address [{}] to master [{}]", transportService.boundAddress().publishAddress(), channel.getView().getCreator());
|
||||
logger.debug("Sent (initial) node information to master [{}], node [{}]", channel.getView().getCreator(), localNode);
|
||||
} catch (Exception e) {
|
||||
logger.warn("Can't send address to master [" + channel.getView().getCreator() + "] will try again later...", e);
|
||||
}
|
||||
@ -255,16 +255,21 @@ public class JgroupsDiscovery extends AbstractLifecycleComponent<Discovery> impl
|
||||
logger.debug("Received node information from [{}], node [{}]", msg.getSrc(), newNode);
|
||||
}
|
||||
|
||||
clusterService.submitStateUpdateTask("jgroups-disco-receive(from node[" + newNode + "])", new ClusterStateUpdateTask() {
|
||||
@Override public ClusterState execute(ClusterState currentState) {
|
||||
if (currentState.nodes().nodeExists(newNode.id())) {
|
||||
// no change, the node already exists in the cluster
|
||||
logger.warn("Received an address [{}] for an existing node [{}]", newNode.address(), newNode);
|
||||
return currentState;
|
||||
if (!transportService.addressSupported(newNode.address().getClass())) {
|
||||
// TODO, what should we do now? Maybe inform that node that its crap?
|
||||
logger.warn("Received a wrong address type from [" + msg.getSrc() + "], ignoring... (received_address[" + newNode.address() + ")");
|
||||
} else {
|
||||
clusterService.submitStateUpdateTask("jgroups-disco-receive(from node[" + newNode + "])", new ClusterStateUpdateTask() {
|
||||
@Override public ClusterState execute(ClusterState currentState) {
|
||||
if (currentState.nodes().nodeExists(newNode.id())) {
|
||||
// no change, the node already exists in the cluster
|
||||
logger.warn("Received an address [{}] for an existing node [{}]", newNode.address(), newNode);
|
||||
return currentState;
|
||||
}
|
||||
return newClusterStateBuilder().state(currentState).nodes(currentState.nodes().newNode(newNode)).build();
|
||||
}
|
||||
return newClusterStateBuilder().state(currentState).nodes(currentState.nodes().newNode(newNode)).build();
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.warn("Can't read address from cluster member [" + msg.getSrc() + "] message [" + msg.getClass().getName() + "/" + msg + "]", e);
|
||||
}
|
||||
@ -290,7 +295,7 @@ public class JgroupsDiscovery extends AbstractLifecycleComponent<Discovery> impl
|
||||
if (!addressSet) {
|
||||
try {
|
||||
channel.send(new Message(newView.getCreator(), channel.getAddress(), nodeMessagePayload()));
|
||||
logger.debug("Sent address [{}] to master [{}]", localNode.address(), newView.getCreator());
|
||||
logger.debug("Sent (view) node information to master [{}], node [{}]", newView.getCreator(), localNode);
|
||||
addressSet = true;
|
||||
} catch (Exception e) {
|
||||
logger.warn("Can't send address to master [" + newView.getCreator() + "] will try again later...", e);
|
||||
@ -331,7 +336,7 @@ public class JgroupsDiscovery extends AbstractLifecycleComponent<Discovery> impl
|
||||
}
|
||||
|
||||
if (!foundMe) {
|
||||
logger.warn("Disconnected from cluster, resending address [{}] to master [{}]", localNode.address(), newView.getCreator());
|
||||
logger.warn("Disconnected from cluster, resending to master [{}], node [{}]", newView.getCreator(), localNode);
|
||||
try {
|
||||
channel.send(new Message(newView.getCreator(), channel.getAddress(), nodeMessagePayload()));
|
||||
addressSet = true;
|
||||
|
@ -23,11 +23,12 @@ import org.elasticsearch.cluster.node.Node;
|
||||
import org.elasticsearch.util.component.LifecycleComponent;
|
||||
import org.elasticsearch.util.io.stream.Streamable;
|
||||
import org.elasticsearch.util.transport.BoundTransportAddress;
|
||||
import org.elasticsearch.util.transport.TransportAddress;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* @author kimchy (Shay Banon)
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public interface Transport extends LifecycleComponent<Transport> {
|
||||
|
||||
@ -64,6 +65,11 @@ public interface Transport extends LifecycleComponent<Transport> {
|
||||
|
||||
BoundTransportAddress boundAddress();
|
||||
|
||||
/**
|
||||
* Is the address type supported.
|
||||
*/
|
||||
boolean addressSupported(Class<? extends TransportAddress> address);
|
||||
|
||||
void nodesAdded(Iterable<Node> nodes);
|
||||
|
||||
void nodesRemoved(Iterable<Node> nodes);
|
||||
|
@ -28,6 +28,7 @@ import org.elasticsearch.util.concurrent.highscalelib.NonBlockingHashMapLong;
|
||||
import org.elasticsearch.util.io.stream.Streamable;
|
||||
import org.elasticsearch.util.settings.Settings;
|
||||
import org.elasticsearch.util.transport.BoundTransportAddress;
|
||||
import org.elasticsearch.util.transport.TransportAddress;
|
||||
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
@ -87,6 +88,10 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
|
||||
transport.close();
|
||||
}
|
||||
|
||||
public boolean addressSupported(Class<? extends TransportAddress> address) {
|
||||
return transport.addressSupported(address);
|
||||
}
|
||||
|
||||
public BoundTransportAddress boundAddress() {
|
||||
return transport.boundAddress();
|
||||
}
|
||||
|
@ -67,6 +67,10 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
|
||||
this.threadPool = threadPool;
|
||||
}
|
||||
|
||||
@Override public boolean addressSupported(Class<? extends TransportAddress> address) {
|
||||
return LocalTransportAddress.class.equals(address);
|
||||
}
|
||||
|
||||
@Override protected void doStart() throws ElasticSearchException {
|
||||
localAddress = new LocalTransportAddress(Long.toString(transportAddressIdGenerator.incrementAndGet()));
|
||||
transports.put(localAddress, this);
|
||||
|
@ -328,6 +328,10 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
|
||||
@Override protected void doClose() throws ElasticSearchException {
|
||||
}
|
||||
|
||||
@Override public boolean addressSupported(Class<? extends TransportAddress> address) {
|
||||
return InetSocketTransportAddress.class.equals(address);
|
||||
}
|
||||
|
||||
@Override public BoundTransportAddress boundAddress() {
|
||||
return this.boundAddress;
|
||||
}
|
||||
|
@ -24,7 +24,7 @@ package org.elasticsearch.util.transport;
|
||||
* the address the transport is bounded on, the the published one represents the one clients should
|
||||
* communicate on.
|
||||
*
|
||||
* @author kimchy (Shay Banon)
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class BoundTransportAddress {
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user