inital support for zen discovery module (multicast discovery implemented)

This commit is contained in:
kimchy 2010-04-24 23:48:45 +03:00
parent 609af0da60
commit cb0d7d4735
44 changed files with 2793 additions and 309 deletions

View File

@ -1,20 +1,24 @@
<component name="ProjectDictionaryState">
<dictionary name="kimchy">
<words>
<w>addr</w>
<w>args</w>
<w>asciifolding</w>
<w>attr</w>
<w>banon</w>
<w>bindhost</w>
<w>birthdate</w>
<w>bool</w>
<w>booleans</w>
<w>camelcase</w>
<w>canonicalhost</w>
<w>checksum</w>
<w>closeable</w>
<w>commitable</w>
<w>committable</w>
<w>configurator</w>
<w>coord</w>
<w>datagram</w>
<w>desc</w>
<w>deserialize</w>
<w>elasticsearch</w>
@ -26,6 +30,7 @@
<w>indices</w>
<w>inet</w>
<w>infos</w>
<w>intf</w>
<w>iter</w>
<w>jgroups</w>
<w>joda</w>
@ -33,12 +38,14 @@
<w>kimchy</w>
<w>lifecycle</w>
<w>linefeeds</w>
<w>loopback</w>
<w>lucene</w>
<w>memcached</w>
<w>metadata</w>
<w>millis</w>
<w>mmap</w>
<w>multi</w>
<w>multicast</w>
<w>multiline</w>
<w>nanos</w>
<w>newcount</w>
@ -49,6 +56,7 @@
<w>pluggable</w>
<w>plugins</w>
<w>porterstem</w>
<w>publishhost</w>
<w>rebalance</w>
<w>sbuf</w>
<w>searchable</w>

View File

@ -61,6 +61,15 @@
<option name="LABEL_INDENT_SIZE" value="0" />
<option name="LABEL_INDENT_ABSOLUTE" value="false" />
</ADDITIONAL_INDENT_OPTIONS>
<ADDITIONAL_INDENT_OPTIONS fileType="php">
<option name="INDENT_SIZE" value="4" />
<option name="CONTINUATION_INDENT_SIZE" value="8" />
<option name="TAB_SIZE" value="4" />
<option name="USE_TAB_CHARACTER" value="false" />
<option name="SMART_TABS" value="false" />
<option name="LABEL_INDENT_SIZE" value="0" />
<option name="LABEL_INDENT_ABSOLUTE" value="false" />
</ADDITIONAL_INDENT_OPTIONS>
<ADDITIONAL_INDENT_OPTIONS fileType="scala">
<option name="INDENT_SIZE" value="2" />
<option name="CONTINUATION_INDENT_SIZE" value="8" />

View File

@ -23,7 +23,7 @@ import org.elasticsearch.util.TimeValue;
import org.elasticsearch.util.component.LifecycleComponent;
/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public interface ClusterService extends LifecycleComponent<ClusterService> {

View File

@ -319,7 +319,11 @@ public class MetaDataService extends AbstractComponent {
}
// build the updated mapping source
final String updatedMappingSource = existingMapper.buildSource();
logger.info("Index [" + index + "]: Update mapping [" + type + "] (dynamic) with source [" + updatedMappingSource + "]");
if (logger.isDebugEnabled()) {
logger.debug("Index [" + index + "]: Update mapping [" + type + "] (dynamic) with source [" + updatedMappingSource + "]");
} else if (logger.isInfoEnabled()) {
logger.info("Index [" + index + "]: Update mapping [" + type + "] (dynamic)");
}
// publish the new mapping
clusterService.submitStateUpdateTask("update-mapping [" + index + "][" + type + "]", new ClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) {
@ -391,7 +395,11 @@ public class MetaDataService extends AbstractComponent {
mapping = new Tuple<String, String>(newMapper.type(), newMapper.buildSource());
}
mappings.put(index, mapping);
logger.info("Index [" + index + "]: Put mapping [" + mapping.v1() + "] with source [" + mapping.v2() + "]");
if (logger.isDebugEnabled()) {
logger.debug("Index [" + index + "]: Put mapping [" + mapping.v1() + "] with source [" + mapping.v2() + "]");
} else if (logger.isInfoEnabled()) {
logger.info("Index [" + index + "]: Put mapping [" + mapping.v1() + "]");
}
}
final CountDownLatch latch = new CountDownLatch(clusterService.state().nodes().size() * indices.length);

View File

@ -31,7 +31,7 @@ import org.elasticsearch.discovery.InitialStateDiscoveryListener;
import org.elasticsearch.env.Environment;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.util.component.AbstractLifecycleComponent;
import org.elasticsearch.util.io.HostResolver;
import org.elasticsearch.util.io.NetworkUtils;
import org.elasticsearch.util.io.stream.BytesStreamInput;
import org.elasticsearch.util.io.stream.BytesStreamOutput;
import org.elasticsearch.util.settings.Settings;
@ -109,8 +109,8 @@ public class JgroupsDiscovery extends AbstractLifecycleComponent<Discovery> impl
if (System.getProperty("jgroups.bind_addr") == null) {
// automatically set the bind address based on ElasticSearch default bindings...
try {
InetAddress bindAddress = HostResolver.resolveBindHostAddress(null, settings, HostResolver.LOCAL_IP);
if ((bindAddress instanceof Inet4Address && HostResolver.isIPv4()) || (bindAddress instanceof Inet6Address && !HostResolver.isIPv4())) {
InetAddress bindAddress = NetworkUtils.resolveBindHostAddress(null, settings, NetworkUtils.LOCAL);
if ((bindAddress instanceof Inet4Address && NetworkUtils.isIPv4()) || (bindAddress instanceof Inet6Address && !NetworkUtils.isIPv4())) {
sysPropsSet.put("jgroups.bind_addr", bindAddress.getHostAddress());
System.setProperty("jgroups.bind_addr", bindAddress.getHostAddress());
}

View File

@ -23,7 +23,7 @@ import com.google.inject.AbstractModule;
import org.elasticsearch.discovery.Discovery;
/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public class LocalDiscoveryModule extends AbstractModule {

View File

@ -0,0 +1,30 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.discovery.zen;
import org.elasticsearch.cluster.node.DiscoveryNodes;
/**
* @author kimchy (shay.banon)
*/
public interface DiscoveryNodesProvider {
DiscoveryNodes nodes();
}

View File

@ -0,0 +1,439 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.discovery.zen;
import com.google.inject.Inject;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.InitialStateDiscoveryListener;
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
import org.elasticsearch.discovery.zen.fd.MasterFaultDetection;
import org.elasticsearch.discovery.zen.fd.NodesFaultDetection;
import org.elasticsearch.discovery.zen.membership.MembershipAction;
import org.elasticsearch.discovery.zen.ping.ZenPing;
import org.elasticsearch.discovery.zen.ping.ZenPingService;
import org.elasticsearch.discovery.zen.publish.PublishClusterStateAction;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.util.TimeValue;
import org.elasticsearch.util.UUID;
import org.elasticsearch.util.component.AbstractLifecycleComponent;
import org.elasticsearch.util.settings.Settings;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import static com.google.common.collect.Lists.*;
import static org.elasticsearch.cluster.ClusterState.*;
import static org.elasticsearch.cluster.node.DiscoveryNodes.*;
import static org.elasticsearch.util.TimeValue.*;
/**
* @author kimchy (shay.banon)
*/
public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implements Discovery, DiscoveryNodesProvider {
private final TransportService transportService;
private final ClusterService clusterService;
private final ClusterName clusterName;
private final ZenPingService pingService;
private final MasterFaultDetection masterFD;
private final NodesFaultDetection nodesFD;
private final PublishClusterStateAction publishClusterState;
private final MembershipAction membership;
private final TimeValue initialPingTimeout;
private final ElectMasterService electMaster;
private DiscoveryNode localNode;
private final CopyOnWriteArrayList<InitialStateDiscoveryListener> initialStateListeners = new CopyOnWriteArrayList<InitialStateDiscoveryListener>();
private volatile boolean master = false;
private volatile boolean firstMaster = false;
private volatile DiscoveryNodes latestDiscoNodes;
private final AtomicBoolean initialStateSent = new AtomicBoolean();
@Inject public ZenDiscovery(Settings settings, ClusterName clusterName, ThreadPool threadPool,
TransportService transportService, ClusterService clusterService,
ZenPingService pingService) {
super(settings);
this.clusterName = clusterName;
this.clusterService = clusterService;
this.transportService = transportService;
this.pingService = pingService;
this.initialPingTimeout = componentSettings.getAsTime("initial_ping_timeout", timeValueSeconds(3));
this.electMaster = new ElectMasterService(settings);
this.masterFD = new MasterFaultDetection(settings, threadPool, transportService, this);
this.masterFD.addListener(new MasterNodeFailureListener());
this.nodesFD = new NodesFaultDetection(settings, threadPool, transportService);
this.nodesFD.addListener(new NodeFailureListener());
this.publishClusterState = new PublishClusterStateAction(settings, transportService, this, new NewClusterStateListener());
this.pingService.setNodesProvider(this);
this.membership = new MembershipAction(settings, transportService, new MembershipListener());
}
@Override protected void doStart() throws ElasticSearchException {
localNode = new DiscoveryNode(settings.get("name"), settings.getAsBoolean("node.data", !settings.getAsBoolean("node.client", false)), UUID.randomUUID().toString(), transportService.boundAddress().publishAddress());
pingService.start();
boolean retry = true;
while (retry) {
retry = false;
DiscoveryNode masterNode = pingTillMasterResolved();
if (localNode.equals(masterNode)) {
// we are the master (first)
this.firstMaster = true;
this.master = true;
nodesFD.start(); // start the nodes FD
clusterService.submitStateUpdateTask("zen-disco-initial_connect(master)", new ProcessedClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) {
DiscoveryNodes.Builder builder = new DiscoveryNodes.Builder()
.localNodeId(localNode.id())
.masterNodeId(localNode.id())
// put our local node
.put(localNode);
// update the fact that we are the master...
latestDiscoNodes = builder.build();
return newClusterStateBuilder().state(currentState).nodes(builder).build();
}
@Override public void clusterStateProcessed(ClusterState clusterState) {
sendInitialStateEventIfNeeded();
}
});
} else {
this.firstMaster = false;
this.master = false;
try {
// first, make sure we can connect to the master
transportService.connectToNode(masterNode);
} catch (Exception e) {
logger.warn("Failed to connect to master [{}], retrying...", e, masterNode);
retry = true;
continue;
}
// send join request
try {
membership.sendJoinRequestBlocking(masterNode, localNode, initialPingTimeout);
} catch (Exception e) {
logger.warn("Failed to send join request to master [{}], retrying...", e, masterNode);
// failed to send the join request, retry
retry = true;
continue;
}
// cool, we found a master, start an FD on it
masterFD.start(masterNode);
}
}
}
@Override protected void doStop() throws ElasticSearchException {
pingService.stop();
if (masterFD.masterNode() != null) {
masterFD.stop();
}
nodesFD.stop();
initialStateSent.set(false);
if (!master) {
try {
membership.sendLeaveRequestBlocking(latestDiscoNodes.masterNode(), localNode, TimeValue.timeValueSeconds(1));
} catch (Exception e) {
logger.debug("Failed to send leave request to master [{}]", e, latestDiscoNodes.masterNode());
}
} else {
DiscoveryNode[] possibleMasters = electMaster.nextPossibleMasters(latestDiscoNodes.nodes().values(), 3);
for (DiscoveryNode possibleMaster : possibleMasters) {
if (localNode.equals(possibleMaster)) {
continue;
}
try {
membership.sendLeaveRequest(latestDiscoNodes.masterNode(), possibleMaster);
} catch (Exception e) {
logger.debug("Failed to send leave request from master [{}] to possible master [{}]", e, latestDiscoNodes.masterNode(), possibleMaster);
}
}
}
master = false;
}
@Override protected void doClose() throws ElasticSearchException {
masterFD.close();
nodesFD.close();
publishClusterState.close();
membership.close();
pingService.close();
}
@Override public void addListener(InitialStateDiscoveryListener listener) {
this.initialStateListeners.add(listener);
}
@Override public void removeListener(InitialStateDiscoveryListener listener) {
this.initialStateListeners.remove(listener);
}
@Override public String nodeDescription() {
return clusterName.value() + "/" + localNode.id();
}
@Override public boolean firstMaster() {
return firstMaster;
}
@Override public DiscoveryNodes nodes() {
DiscoveryNodes latestNodes = this.latestDiscoNodes;
if (latestNodes != null) {
return latestNodes;
}
// have not decided yet, just send the local node
return newNodesBuilder().put(localNode).localNodeId(localNode.id()).build();
}
@Override public void publish(ClusterState clusterState) {
if (!master) {
throw new ElasticSearchIllegalStateException("Shouldn't publish state when not master");
}
latestDiscoNodes = clusterState.nodes();
nodesFD.updateNodes(clusterState.nodes());
publishClusterState.publish(clusterState);
}
private void handleNodeFailure(final DiscoveryNode node) {
if (!master) {
// nothing to do here...
return;
}
clusterService.submitStateUpdateTask("zen-disco-node_failed(" + node + ")", new ProcessedClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) {
DiscoveryNodes.Builder builder = new DiscoveryNodes.Builder()
.putAll(currentState.nodes())
.remove(node.id());
latestDiscoNodes = builder.build();
return newClusterStateBuilder().state(currentState).nodes(latestDiscoNodes).build();
}
@Override public void clusterStateProcessed(ClusterState clusterState) {
sendInitialStateEventIfNeeded();
}
});
}
private void handleMasterGone(final DiscoveryNode masterNode, String reason) {
if (master) {
// we might get this on both a master telling us shutting down, and then the disconnect failure
return;
}
logger.info("Master [{}] left, reason [{}]", masterNode, reason);
List<DiscoveryNode> nodes = newArrayList(latestDiscoNodes.nodes().values());
nodes.remove(masterNode); // remove the master node from the list, it has failed
// sort then
DiscoveryNode electedMaster = electMaster.electMaster(nodes);
if (localNode.equals(electedMaster)) {
this.master = true;
masterFD.stop();
nodesFD.start();
clusterService.submitStateUpdateTask("zen-disco-elected_as_master(old master [" + masterNode + "])", new ProcessedClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) {
DiscoveryNodes.Builder builder = new DiscoveryNodes.Builder()
.putAll(currentState.nodes())
// make sure the old master node, which has failed, is not part of the nodes we publish
.remove(masterNode.id())
.masterNodeId(localNode.id());
// update the fact that we are the master...
latestDiscoNodes = builder.build();
return newClusterStateBuilder().state(currentState).nodes(latestDiscoNodes).build();
}
@Override public void clusterStateProcessed(ClusterState clusterState) {
sendInitialStateEventIfNeeded();
}
});
} else {
nodesFD.stop();
if (electedMaster != null) {
// we are not the master, start FD against the possible master
masterFD.restart(electedMaster);
} else {
masterFD.stop();
}
}
}
void handleNewClusterState(final ClusterState clusterState) {
if (master) {
logger.warn("Master should not receive new cluster state from [{}]", clusterState.nodes().masterNode());
} else {
latestDiscoNodes = clusterState.nodes();
// check to see that we monitor the correct master of the cluster
if (masterFD.masterNode() != null && masterFD.masterNode().equals(latestDiscoNodes.masterNode())) {
masterFD.restart(latestDiscoNodes.masterNode());
}
if (clusterState.nodes().localNode() == null) {
logger.warn("Received a cluster state from [{}] and not part of the cluster, should not happen", clusterState.nodes().masterNode());
} else {
clusterService.submitStateUpdateTask("zen-disco-receive(from [" + clusterState.nodes().masterNode() + "])", new ProcessedClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) {
return clusterState;
}
@Override public void clusterStateProcessed(ClusterState clusterState) {
sendInitialStateEventIfNeeded();
}
});
}
}
}
private void handleLeaveRequest(final DiscoveryNode node) {
if (master) {
clusterService.submitStateUpdateTask("zen-disco-node_failed(" + node + ")", new ClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) {
DiscoveryNodes.Builder builder = new DiscoveryNodes.Builder()
.putAll(currentState.nodes())
.remove(node.id());
latestDiscoNodes = builder.build();
return newClusterStateBuilder().state(currentState).nodes(latestDiscoNodes).build();
}
});
} else {
handleMasterGone(node, "shut_down");
}
}
private void handleJoinRequest(final DiscoveryNode node) {
if (!master) {
throw new ElasticSearchIllegalStateException("Node [" + localNode + "] not master for join request from [" + node + "]");
}
if (!transportService.addressSupported(node.address().getClass())) {
// TODO, what should we do now? Maybe inform that node that its crap?
logger.warn("Received a wrong address type from [{}], ignoring...", node);
} else {
clusterService.submitStateUpdateTask("zen-disco-receive(from node[" + node + "])", new ClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) {
if (currentState.nodes().nodeExists(node.id())) {
// no change, the node already exists in the cluster
logger.warn("Received an existing node [{}]", node);
return currentState;
}
return newClusterStateBuilder().state(currentState).nodes(currentState.nodes().newNode(node)).build();
}
});
}
}
private DiscoveryNode pingTillMasterResolved() {
while (true) {
ZenPing.PingResponse[] pingResponses = pingService.pingAndWait(initialPingTimeout);
List<DiscoveryNode> pingMasters = newArrayList();
for (ZenPing.PingResponse pingResponse : pingResponses) {
if (pingResponse.master() != null) {
pingMasters.add(pingResponse.master());
}
}
if (pingMasters.isEmpty()) {
// lets tie break between discovered nodes
List<DiscoveryNode> possibleMasterNodes = newArrayList();
possibleMasterNodes.add(localNode);
for (ZenPing.PingResponse pingResponse : pingResponses) {
possibleMasterNodes.add(pingResponse.target());
}
DiscoveryNode electedMaster = electMaster.electMaster(possibleMasterNodes);
if (localNode.equals(electedMaster)) {
return localNode;
}
} else {
DiscoveryNode electedMaster = electMaster.electMaster(pingMasters);
if (electedMaster != null) {
return electedMaster;
}
}
}
}
private void sendInitialStateEventIfNeeded() {
if (initialStateSent.compareAndSet(false, true)) {
for (InitialStateDiscoveryListener listener : initialStateListeners) {
listener.initialStateProcessed();
}
}
}
private class NewClusterStateListener implements PublishClusterStateAction.NewClusterStateListener {
@Override public void onNewClusterState(ClusterState clusterState) {
handleNewClusterState(clusterState);
}
}
private class MembershipListener implements MembershipAction.MembershipListener {
@Override public void onJoin(DiscoveryNode node) {
handleJoinRequest(node);
}
@Override public void onLeave(DiscoveryNode node) {
handleLeaveRequest(node);
}
}
private class NodeFailureListener implements NodesFaultDetection.Listener {
@Override public void onNodeFailure(DiscoveryNode node) {
handleNodeFailure(node);
}
}
private class MasterNodeFailureListener implements MasterFaultDetection.Listener {
@Override public void onMasterFailure(DiscoveryNode masterNode) {
handleMasterGone(masterNode, "failure");
}
@Override public void onDisconnectedFromMaster() {
// got disconnected from the master, send a join request
membership.sendJoinRequest(latestDiscoNodes.masterNode(), localNode);
}
}
}

View File

@ -0,0 +1,35 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.discovery.zen;
import com.google.inject.AbstractModule;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.zen.ping.ZenPingService;
/**
* @author kimchy (shay.banon)
*/
public class ZenDiscoveryModule extends AbstractModule {
@Override protected void configure() {
bind(ZenPingService.class).asEagerSingleton();
bind(Discovery.class).to(ZenDiscovery.class).asEagerSingleton();
}
}

View File

@ -0,0 +1,90 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.discovery.zen.elect;
import com.google.common.collect.Lists;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.util.component.AbstractComponent;
import org.elasticsearch.util.settings.Settings;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import static com.google.common.collect.Lists.*;
/**
* @author kimchy (shay.banon)
*/
public class ElectMasterService extends AbstractComponent {
private final NodeComparator nodeComparator = new NodeComparator();
public ElectMasterService(Settings settings) {
super(settings);
}
/**
* Returns a list of the next possible masters.
*/
public DiscoveryNode[] nextPossibleMasters(Iterable<DiscoveryNode> nodes, int numberOfPossibleMasters) {
List<DiscoveryNode> sortedNodes = sortedNodes(nodes);
if (sortedNodes == null) {
return new DiscoveryNode[0];
}
List<DiscoveryNode> nextPossibleMasters = newArrayListWithExpectedSize(numberOfPossibleMasters);
int counter = 0;
for (DiscoveryNode nextPossibleMaster : sortedNodes) {
if (++counter >= numberOfPossibleMasters) {
break;
}
nextPossibleMasters.add(nextPossibleMaster);
}
return nextPossibleMasters.toArray(new DiscoveryNode[nextPossibleMasters.size()]);
}
/**
* Elects a new master out of the possible nodes, returning it. Returns <tt>null</tt>
* if no master has been elected.
*/
public DiscoveryNode electMaster(Iterable<DiscoveryNode> nodes) {
List<DiscoveryNode> sortedNodes = sortedNodes(nodes);
if (sortedNodes == null) {
return null;
}
return sortedNodes.get(0);
}
private List<DiscoveryNode> sortedNodes(Iterable<DiscoveryNode> nodes) {
List<DiscoveryNode> possibleNodes = Lists.newArrayList(nodes);
if (possibleNodes.isEmpty()) {
return null;
}
Collections.sort(possibleNodes, nodeComparator);
return possibleNodes;
}
private static class NodeComparator implements Comparator<DiscoveryNode> {
@Override public int compare(DiscoveryNode o1, DiscoveryNode o2) {
return o1.id().compareTo(o2.id());
}
}
}

View File

@ -0,0 +1,276 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.discovery.zen.fd;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import org.elasticsearch.util.TimeValue;
import org.elasticsearch.util.component.AbstractComponent;
import org.elasticsearch.util.io.stream.StreamInput;
import org.elasticsearch.util.io.stream.StreamOutput;
import org.elasticsearch.util.io.stream.Streamable;
import org.elasticsearch.util.settings.Settings;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.elasticsearch.cluster.node.DiscoveryNode.*;
import static org.elasticsearch.util.TimeValue.*;
/**
* @author kimchy (shay.banon)
*/
public class MasterFaultDetection extends AbstractComponent {
public static interface Listener {
void onMasterFailure(DiscoveryNode masterNode);
void onDisconnectedFromMaster();
}
private final ThreadPool threadPool;
private final TransportService transportService;
private final DiscoveryNodesProvider nodesProvider;
private final CopyOnWriteArrayList<Listener> listeners = new CopyOnWriteArrayList<Listener>();
private final boolean connectOnNetworkDisconnect;
private final TimeValue pingInterval;
private final TimeValue pingRetryTimeout;
private final int pingRetryCount;
private final FDConnectionListener connectionListener;
private volatile DiscoveryNode masterNode;
private volatile int retryCount;
private final AtomicBoolean notifiedMasterFailure = new AtomicBoolean();
public MasterFaultDetection(Settings settings, ThreadPool threadPool, TransportService transportService, DiscoveryNodesProvider nodesProvider) {
super(settings);
this.threadPool = threadPool;
this.transportService = transportService;
this.nodesProvider = nodesProvider;
this.connectOnNetworkDisconnect = componentSettings.getAsBoolean("connect_on_network_disconnect", false);
this.pingInterval = componentSettings.getAsTime("ping_interval", timeValueSeconds(1));
this.pingRetryTimeout = componentSettings.getAsTime("ping_timeout", timeValueSeconds(6));
this.pingRetryCount = componentSettings.getAsInt("ping_retries", 5);
this.connectionListener = new FDConnectionListener();
transportService.addConnectionListener(connectionListener);
transportService.registerHandler(MasterPingRequestHandler.ACTION, new MasterPingRequestHandler());
}
public DiscoveryNode masterNode() {
return this.masterNode;
}
public void addListener(Listener listener) {
listeners.add(listener);
}
public void removeListener(Listener listener) {
listeners.remove(listener);
}
public void restart(DiscoveryNode masterNode) {
stop();
start(masterNode);
}
public void start(DiscoveryNode masterNode) {
this.masterNode = masterNode;
this.retryCount = 0;
this.notifiedMasterFailure.set(false);
// try and connect to make sure we are connected
try {
transportService.connectToNode(masterNode);
} catch (Exception e) {
notifyMasterFailure(masterNode);
}
// start the ping process
threadPool.schedule(new SendPingRequest(), pingInterval);
}
public void stop() {
// also will stop the next ping schedule
this.retryCount = 0;
this.masterNode = null;
}
public void close() {
stop();
this.listeners.clear();
transportService.removeConnectionListener(connectionListener);
transportService.removeHandler(MasterPingRequestHandler.ACTION);
}
private void handleTransportDisconnect(DiscoveryNode node) {
if (!node.equals(this.masterNode)) {
return;
}
if (connectOnNetworkDisconnect) {
try {
transportService.connectToNode(node);
} catch (Exception e) {
logger.trace("Master [{}] failed on disconnect (with verified connect)", masterNode);
notifyMasterFailure(masterNode);
}
} else {
logger.trace("Master [{}] failed on disconnect", masterNode);
notifyMasterFailure(masterNode);
}
}
private void notifyDisconnectedFromMaster() {
for (Listener listener : listeners) {
listener.onDisconnectedFromMaster();
}
// we don't stop on disconnection from master, we keep pinging it
}
private void notifyMasterFailure(DiscoveryNode masterNode) {
if (notifiedMasterFailure.compareAndSet(false, true)) {
for (Listener listener : listeners) {
listener.onMasterFailure(masterNode);
}
stop();
}
}
private class FDConnectionListener implements TransportConnectionListener {
@Override public void onNodeConnected(DiscoveryNode node) {
}
@Override public void onNodeDisconnected(DiscoveryNode node) {
handleTransportDisconnect(node);
}
}
private class SendPingRequest implements Runnable {
@Override public void run() {
if (masterNode != null) {
final DiscoveryNode sentToNode = masterNode;
transportService.sendRequest(masterNode, MasterPingRequestHandler.ACTION, new MasterPingRequest(nodesProvider.nodes().localNode()), pingRetryTimeout,
new BaseTransportResponseHandler<MasterPingResponseResponse>() {
@Override public MasterPingResponseResponse newInstance() {
return new MasterPingResponseResponse();
}
@Override public void handleResponse(MasterPingResponseResponse response) {
// check if the master node did not get switched on us...
if (sentToNode.equals(MasterFaultDetection.this.masterNode())) {
if (!response.connectedToMaster) {
logger.trace("Master [{}] does not have us registered with it...", masterNode);
notifyDisconnectedFromMaster();
} else {
threadPool.schedule(SendPingRequest.this, pingInterval);
}
}
}
@Override public void handleException(RemoteTransportException exp) {
// check if the master node did not get switched on us...
if (sentToNode.equals(MasterFaultDetection.this.masterNode())) {
int retryCount = ++MasterFaultDetection.this.retryCount;
logger.trace("Master [{}] failed to ping, retry [{}] out of [{}]", exp, masterNode, retryCount, pingRetryCount);
if (retryCount >= pingRetryCount) {
logger.trace("Master [{}] failed on ping", masterNode);
// not good, failure
notifyMasterFailure(sentToNode);
}
}
}
});
}
}
}
private class MasterPingRequestHandler extends BaseTransportRequestHandler<MasterPingRequest> {
public static final String ACTION = "discovery/zen/fd/masterPing";
@Override public MasterPingRequest newInstance() {
return new MasterPingRequest();
}
@Override public void messageReceived(MasterPingRequest request, TransportChannel channel) throws Exception {
DiscoveryNodes nodes = nodesProvider.nodes();
channel.sendResponse(new MasterPingResponseResponse(nodes.nodeExists(request.node.id())));
}
}
private class MasterPingRequest implements Streamable {
private DiscoveryNode node;
private MasterPingRequest() {
}
private MasterPingRequest(DiscoveryNode node) {
this.node = node;
}
@Override public void readFrom(StreamInput in) throws IOException {
node = readNode(in);
}
@Override public void writeTo(StreamOutput out) throws IOException {
node.writeTo(out);
}
}
private class MasterPingResponseResponse implements Streamable {
private boolean connectedToMaster;
private MasterPingResponseResponse() {
}
private MasterPingResponseResponse(boolean connectedToMaster) {
this.connectedToMaster = connectedToMaster;
}
@Override public void readFrom(StreamInput in) throws IOException {
connectedToMaster = in.readBoolean();
}
@Override public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(connectedToMaster);
}
}
}

View File

@ -0,0 +1,269 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.discovery.zen.fd;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import org.elasticsearch.util.TimeValue;
import org.elasticsearch.util.component.AbstractComponent;
import org.elasticsearch.util.io.stream.StreamInput;
import org.elasticsearch.util.io.stream.StreamOutput;
import org.elasticsearch.util.io.stream.Streamable;
import org.elasticsearch.util.settings.Settings;
import java.io.IOException;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import static org.elasticsearch.cluster.node.DiscoveryNodes.*;
import static org.elasticsearch.util.TimeValue.*;
import static org.elasticsearch.util.concurrent.ConcurrentMaps.*;
/**
* @author kimchy (shay.banon)
*/
public class NodesFaultDetection extends AbstractComponent {
public static interface Listener {
void onNodeFailure(DiscoveryNode node);
}
private final ThreadPool threadPool;
private final TransportService transportService;
private final boolean connectOnNetworkDisconnect;
private final TimeValue pingInterval;
private final TimeValue pingRetryTimeout;
private final int pingRetryCount;
private final CopyOnWriteArrayList<Listener> listeners = new CopyOnWriteArrayList<Listener>();
private final ConcurrentMap<DiscoveryNode, NodeFD> nodesFD = newConcurrentMap();
private final FDConnectionListener connectionListener;
private volatile DiscoveryNodes latestNodes = EMPTY_NODES;
private volatile boolean running = false;
public NodesFaultDetection(Settings settings, ThreadPool threadPool, TransportService transportService) {
super(settings);
this.threadPool = threadPool;
this.transportService = transportService;
this.connectOnNetworkDisconnect = componentSettings.getAsBoolean("connect_on_network_disconnect", false);
this.pingInterval = componentSettings.getAsTime("ping_interval", timeValueSeconds(1));
this.pingRetryTimeout = componentSettings.getAsTime("ping_timeout", timeValueSeconds(6));
this.pingRetryCount = componentSettings.getAsInt("ping_retries", 5);
transportService.registerHandler(PingRequestHandler.ACTION, new PingRequestHandler());
this.connectionListener = new FDConnectionListener();
transportService.addConnectionListener(connectionListener);
}
public void addListener(Listener listener) {
listeners.add(listener);
}
public void removeListener(Listener listener) {
listeners.remove(listener);
}
public void updateNodes(DiscoveryNodes nodes) {
DiscoveryNodes prevNodes = latestNodes;
this.latestNodes = nodes;
if (!running) {
return;
}
DiscoveryNodes.Delta delta = nodes.delta(prevNodes);
for (DiscoveryNode newNode : delta.addedNodes()) {
if (!nodesFD.containsKey(newNode)) {
nodesFD.put(newNode, new NodeFD());
threadPool.schedule(new SendPingRequest(newNode), pingInterval);
}
}
for (DiscoveryNode removedNode : delta.removedNodes()) {
nodesFD.remove(removedNode);
}
}
public NodesFaultDetection start() {
if (running) {
return this;
}
running = true;
return this;
}
public NodesFaultDetection stop() {
if (!running) {
return this;
}
running = false;
return this;
}
public void close() {
stop();
transportService.removeHandler(PingRequestHandler.ACTION);
transportService.removeConnectionListener(connectionListener);
}
private void handleTransportDisconnect(DiscoveryNode node) {
if (!latestNodes.nodeExists(node.id())) {
return;
}
NodeFD nodeFD = nodesFD.remove(node);
if (nodeFD == null) {
return;
}
if (!running) {
return;
}
if (connectOnNetworkDisconnect) {
try {
transportService.connectToNode(node);
} catch (Exception e) {
logger.trace("Node [{}] failed on disconnect (with verified connect)", node);
notifyNodeFailure(node);
}
} else {
logger.trace("Node [{}] failed on disconnect", node);
notifyNodeFailure(node);
}
}
private void notifyNodeFailure(DiscoveryNode node) {
for (Listener listener : listeners) {
listener.onNodeFailure(node);
}
}
private class SendPingRequest implements Runnable {
private final DiscoveryNode node;
private SendPingRequest(DiscoveryNode node) {
this.node = node;
}
@Override public void run() {
if (!running) {
return;
}
transportService.sendRequest(node, PingRequestHandler.ACTION, new PingRequest(), pingRetryTimeout,
new BaseTransportResponseHandler<PingResponse>() {
@Override public PingResponse newInstance() {
return new PingResponse();
}
@Override public void handleResponse(PingResponse response) {
if (running) {
NodeFD nodeFD = nodesFD.get(node);
if (nodeFD != null) {
nodeFD.retryCount = 0;
threadPool.schedule(SendPingRequest.this, pingInterval);
}
}
}
@Override public void handleException(RemoteTransportException exp) {
// check if the master node did not get switched on us...
if (running) {
NodeFD nodeFD = nodesFD.get(node);
if (nodeFD != null) {
int retryCount = ++nodeFD.retryCount;
logger.trace("Node [{}] failed to ping, retry [{}] out of [{}]", exp, node, retryCount, pingRetryCount);
if (retryCount >= pingRetryCount) {
logger.trace("Node [{}] failed on ping", node);
// not good, failure
if (nodesFD.remove(node) != null) {
notifyNodeFailure(node);
}
}
}
}
}
});
}
}
static class NodeFD {
volatile int retryCount;
}
private class FDConnectionListener implements TransportConnectionListener {
@Override public void onNodeConnected(DiscoveryNode node) {
}
@Override public void onNodeDisconnected(DiscoveryNode node) {
handleTransportDisconnect(node);
}
}
private class PingRequestHandler extends BaseTransportRequestHandler<PingRequest> {
public static final String ACTION = "discovery/zen/fd/ping";
@Override public PingRequest newInstance() {
return new PingRequest();
}
@Override public void messageReceived(PingRequest request, TransportChannel channel) throws Exception {
channel.sendResponse(new PingResponse());
}
}
private class PingRequest implements Streamable {
private PingRequest() {
}
@Override public void readFrom(StreamInput in) throws IOException {
}
@Override public void writeTo(StreamOutput out) throws IOException {
}
}
private class PingResponse implements Streamable {
private PingResponse() {
}
@Override public void readFrom(StreamInput in) throws IOException {
}
@Override public void writeTo(StreamOutput out) throws IOException {
}
}
}

View File

@ -0,0 +1,152 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.discovery.zen.membership;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.transport.BaseTransportRequestHandler;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.VoidTransportResponseHandler;
import org.elasticsearch.util.TimeValue;
import org.elasticsearch.util.component.AbstractComponent;
import org.elasticsearch.util.io.stream.StreamInput;
import org.elasticsearch.util.io.stream.StreamOutput;
import org.elasticsearch.util.io.stream.Streamable;
import org.elasticsearch.util.io.stream.VoidStreamable;
import org.elasticsearch.util.settings.Settings;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* @author kimchy (shay.banon)
*/
public class MembershipAction extends AbstractComponent {
public static interface MembershipListener {
void onJoin(DiscoveryNode node);
void onLeave(DiscoveryNode node);
}
private final TransportService transportService;
private final MembershipListener listener;
public MembershipAction(Settings settings, TransportService transportService, MembershipListener listener) {
super(settings);
this.transportService = transportService;
this.listener = listener;
transportService.registerHandler(JoinRequestRequestHandler.ACTION, new JoinRequestRequestHandler());
transportService.registerHandler(LeaveRequestRequestHandler.ACTION, new LeaveRequestRequestHandler());
}
public void close() {
transportService.removeHandler(JoinRequestRequestHandler.ACTION);
transportService.removeHandler(LeaveRequestRequestHandler.ACTION);
}
public void sendLeaveRequest(DiscoveryNode masterNode, DiscoveryNode node) {
transportService.sendRequest(node, LeaveRequestRequestHandler.ACTION, new LeaveRequest(masterNode), VoidTransportResponseHandler.INSTANCE_NOSPAWN);
}
public void sendLeaveRequestBlocking(DiscoveryNode masterNode, DiscoveryNode node, TimeValue timeout) throws ElasticSearchException, TimeoutException {
transportService.submitRequest(masterNode, LeaveRequestRequestHandler.ACTION, new LeaveRequest(node), VoidTransportResponseHandler.INSTANCE_NOSPAWN).txGet(timeout.millis(), TimeUnit.MILLISECONDS);
}
public void sendJoinRequest(DiscoveryNode masterNode, DiscoveryNode node) {
transportService.sendRequest(masterNode, JoinRequestRequestHandler.ACTION, new JoinRequest(node), VoidTransportResponseHandler.INSTANCE_NOSPAWN);
}
public void sendJoinRequestBlocking(DiscoveryNode masterNode, DiscoveryNode node, TimeValue timeout) throws ElasticSearchException, TimeoutException {
transportService.submitRequest(masterNode, JoinRequestRequestHandler.ACTION, new JoinRequest(node), VoidTransportResponseHandler.INSTANCE_NOSPAWN).txGet(timeout.millis(), TimeUnit.MILLISECONDS);
}
private static class JoinRequest implements Streamable {
private DiscoveryNode node;
private JoinRequest() {
}
private JoinRequest(DiscoveryNode node) {
this.node = node;
}
@Override public void readFrom(StreamInput in) throws IOException {
node = DiscoveryNode.readNode(in);
}
@Override public void writeTo(StreamOutput out) throws IOException {
node.writeTo(out);
}
}
private class JoinRequestRequestHandler extends BaseTransportRequestHandler<JoinRequest> {
static final String ACTION = "discovery/zen/join";
@Override public JoinRequest newInstance() {
return new JoinRequest();
}
@Override public void messageReceived(JoinRequest request, TransportChannel channel) throws Exception {
listener.onJoin(request.node);
channel.sendResponse(VoidStreamable.INSTANCE);
}
}
private static class LeaveRequest implements Streamable {
private DiscoveryNode node;
private LeaveRequest() {
}
private LeaveRequest(DiscoveryNode node) {
this.node = node;
}
@Override public void readFrom(StreamInput in) throws IOException {
node = DiscoveryNode.readNode(in);
}
@Override public void writeTo(StreamOutput out) throws IOException {
node.writeTo(out);
}
}
private class LeaveRequestRequestHandler extends BaseTransportRequestHandler<LeaveRequest> {
static final String ACTION = "discovery/zen/leave";
@Override public LeaveRequest newInstance() {
return new LeaveRequest();
}
@Override public void messageReceived(LeaveRequest request, TransportChannel channel) throws Exception {
listener.onLeave(request.node);
channel.sendResponse(VoidStreamable.INSTANCE);
}
}
}

View File

@ -0,0 +1,92 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.discovery.zen.ping;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
import org.elasticsearch.util.TimeValue;
import org.elasticsearch.util.component.LifecycleComponent;
import org.elasticsearch.util.io.stream.StreamInput;
import org.elasticsearch.util.io.stream.StreamOutput;
import org.elasticsearch.util.io.stream.Streamable;
import java.io.IOException;
import static org.elasticsearch.cluster.node.DiscoveryNode.*;
/**
* @author kimchy (shay.banon)
*/
public interface ZenPing extends LifecycleComponent<ZenPing> {
void setNodesProvider(DiscoveryNodesProvider nodesProvider);
void ping(PingListener listener, TimeValue timeout) throws ElasticSearchException;
public interface PingListener {
void onPing(PingResponse[] pings);
}
public class PingResponse implements Streamable {
private DiscoveryNode target;
private DiscoveryNode master;
public PingResponse() {
}
public PingResponse(DiscoveryNode target, DiscoveryNode master) {
this.target = target;
this.master = master;
}
public DiscoveryNode target() {
return target;
}
public DiscoveryNode master() {
return master;
}
@Override public void readFrom(StreamInput in) throws IOException {
target = readNode(in);
if (in.readBoolean()) {
master = readNode(in);
}
}
@Override public void writeTo(StreamOutput out) throws IOException {
target.writeTo(out);
if (master == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
master.writeTo(out);
}
}
@Override public String toString() {
return "ping_response target [" + target + "], master [" + master + "]";
}
}
}

View File

@ -0,0 +1,36 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.discovery.zen.ping;
import org.elasticsearch.discovery.DiscoveryException;
/**
* @author kimchy (shay.banon)
*/
public class ZenPingException extends DiscoveryException {
public ZenPingException(String message) {
super(message);
}
public ZenPingException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@ -0,0 +1,151 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.discovery.zen.ping;
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
import org.elasticsearch.discovery.zen.ping.multicast.MulticastZenPing;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.util.TimeValue;
import org.elasticsearch.util.component.AbstractLifecycleComponent;
import org.elasticsearch.util.settings.Settings;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
/**
* @author kimchy (shay.banon)
*/
public class ZenPingService extends AbstractLifecycleComponent<ZenPing> implements ZenPing {
private volatile ImmutableList<? extends ZenPing> zenPings = ImmutableList.of();
@Inject public ZenPingService(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName) {
super(settings);
this.zenPings = ImmutableList.of(new MulticastZenPing(settings, threadPool, transportService, clusterName));
}
public ImmutableList<? extends ZenPing> zenPings() {
return this.zenPings;
}
public void zenPings(ImmutableList<? extends ZenPing> pings) {
this.zenPings = pings;
if (lifecycle.started()) {
for (ZenPing zenPing : zenPings) {
zenPing.start();
}
} else if (lifecycle.stopped()) {
for (ZenPing zenPing : zenPings) {
zenPing.stop();
}
}
}
@Override public void setNodesProvider(DiscoveryNodesProvider nodesProvider) {
if (lifecycle.started()) {
throw new ElasticSearchIllegalStateException("Can't set nodes provider when started");
}
for (ZenPing zenPing : zenPings) {
zenPing.setNodesProvider(nodesProvider);
}
}
@Override protected void doStart() throws ElasticSearchException {
for (ZenPing zenPing : zenPings) {
zenPing.start();
}
}
@Override protected void doStop() throws ElasticSearchException {
for (ZenPing zenPing : zenPings) {
zenPing.stop();
}
}
@Override protected void doClose() throws ElasticSearchException {
for (ZenPing zenPing : zenPings) {
zenPing.close();
}
}
public PingResponse[] pingAndWait(TimeValue timeout) {
final AtomicReference<PingResponse[]> response = new AtomicReference<PingResponse[]>();
final CountDownLatch latch = new CountDownLatch(1);
ping(new PingListener() {
@Override public void onPing(PingResponse[] pings) {
response.set(pings);
latch.countDown();
}
}, timeout);
try {
latch.await();
return response.get();
} catch (InterruptedException e) {
return null;
}
}
@Override public void ping(PingListener listener, TimeValue timeout) throws ElasticSearchException {
ImmutableList<? extends ZenPing> zenPings = this.zenPings;
CompoundPingListener compoundPingListener = new CompoundPingListener(listener, zenPings);
for (ZenPing zenPing : zenPings) {
zenPing.ping(compoundPingListener, timeout);
}
}
private static class CompoundPingListener implements PingListener {
private final PingListener listener;
private final ImmutableList<? extends ZenPing> zenPings;
private final AtomicInteger counter;
private ConcurrentMap<DiscoveryNode, PingResponse> responses = new ConcurrentHashMap<DiscoveryNode, PingResponse>();
private CompoundPingListener(PingListener listener, ImmutableList<? extends ZenPing> zenPings) {
this.listener = listener;
this.zenPings = zenPings;
this.counter = new AtomicInteger(zenPings.size());
}
@Override public void onPing(PingResponse[] pings) {
if (pings != null) {
for (PingResponse pingResponse : pings) {
responses.put(pingResponse.target(), pingResponse);
}
}
if (counter.decrementAndGet() == 0) {
listener.onPing(responses.values().toArray(new PingResponse[responses.size()]));
}
}
}
}

View File

@ -0,0 +1,359 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.discovery.zen.ping.multicast;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.discovery.DiscoveryException;
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
import org.elasticsearch.discovery.zen.ping.ZenPing;
import org.elasticsearch.discovery.zen.ping.ZenPingException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import org.elasticsearch.util.TimeValue;
import org.elasticsearch.util.component.AbstractLifecycleComponent;
import org.elasticsearch.util.io.stream.*;
import org.elasticsearch.util.settings.ImmutableSettings;
import org.elasticsearch.util.settings.Settings;
import java.io.IOException;
import java.net.*;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.cluster.node.DiscoveryNode.*;
import static org.elasticsearch.util.concurrent.ConcurrentMaps.*;
import static org.elasticsearch.util.concurrent.DynamicExecutors.*;
import static org.elasticsearch.util.io.NetworkUtils.*;
/**
* @author kimchy (shay.banon)
*/
public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implements ZenPing {
private final String address;
private final int port;
private final String group;
private final int bufferSize;
private final int ttl;
private final ThreadPool threadPool;
private final TransportService transportService;
private final ClusterName clusterName;
private volatile DiscoveryNodesProvider nodesProvider;
private volatile Receiver receiver;
private volatile Thread receiverThread;
private MulticastSocket multicastSocket;
private DatagramPacket datagramPacketSend;
private DatagramPacket datagramPacketReceive;
private final AtomicInteger pingIdGenerator = new AtomicInteger();
private final Map<Integer, ConcurrentMap<DiscoveryNode, PingResponse>> receivedResponses = newConcurrentMap();
private final Object sendMutex = new Object();
private final Object receiveMutex = new Object();
public MulticastZenPing(ThreadPool threadPool, TransportService transportService, ClusterName clusterName) {
this(ImmutableSettings.Builder.EMPTY_SETTINGS, threadPool, transportService, clusterName);
}
public MulticastZenPing(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName) {
super(settings);
this.threadPool = threadPool;
this.transportService = transportService;
this.clusterName = clusterName;
this.address = componentSettings.get("address");
this.port = componentSettings.getAsInt("port", 54328);
this.group = componentSettings.get("group", "224.2.2.4");
this.bufferSize = componentSettings.getAsInt("buffer_size", 2048);
this.ttl = componentSettings.getAsInt("ttl", 3);
this.transportService.registerHandler(PingResponseRequestHandler.ACTION, new PingResponseRequestHandler());
}
@Override public void setNodesProvider(DiscoveryNodesProvider nodesProvider) {
if (lifecycle.started()) {
throw new ElasticSearchIllegalStateException("Can't set nodes provider when started");
}
this.nodesProvider = nodesProvider;
}
@Override protected void doStart() throws ElasticSearchException {
try {
this.datagramPacketReceive = new DatagramPacket(new byte[bufferSize], bufferSize);
this.datagramPacketSend = new DatagramPacket(new byte[bufferSize], bufferSize, InetAddress.getByName(group), port);
} catch (Exception e) {
throw new DiscoveryException("Failed to set datagram packets", e);
}
try {
MulticastSocket multicastSocket = new MulticastSocket(null);
multicastSocket.setReuseAddress(true);
// bind to receive interface
multicastSocket.bind(new InetSocketAddress(port));
multicastSocket.setTimeToLive(ttl);
// set the send interface
InetAddress multicastInterface = resolvePublishHostAddress(address, settings);
multicastSocket.setInterface(multicastInterface);
multicastSocket.setReceiveBufferSize(bufferSize);
multicastSocket.setSendBufferSize(bufferSize);
multicastSocket.joinGroup(InetAddress.getByName(group));
multicastSocket.setSoTimeout(60000);
this.multicastSocket = multicastSocket;
} catch (Exception e) {
throw new DiscoveryException("Failed to setup multicast socket", e);
}
this.receiver = new Receiver();
this.receiverThread = daemonThreadFactory(settings, "discovery#multicast#received").newThread(receiver);
this.receiverThread.start();
}
@Override protected void doStop() throws ElasticSearchException {
receiver.stop();
receiverThread.interrupt();
multicastSocket.close();
}
@Override protected void doClose() throws ElasticSearchException {
}
public PingResponse[] pingAndWait(TimeValue timeout) {
final AtomicReference<PingResponse[]> response = new AtomicReference<PingResponse[]>();
final CountDownLatch latch = new CountDownLatch(1);
ping(new PingListener() {
@Override public void onPing(PingResponse[] pings) {
response.set(pings);
latch.countDown();
}
}, timeout);
try {
latch.await();
return response.get();
} catch (InterruptedException e) {
return null;
}
}
@Override public void ping(final PingListener listener, final TimeValue timeout) {
final int id = pingIdGenerator.incrementAndGet();
receivedResponses.put(id, new ConcurrentHashMap<DiscoveryNode, PingResponse>());
sendPingRequest(id);
// try and send another ping request halfway through (just in case someone woke up during it...)
// this can be a good trade-off to nailing the initial lookup or un-delivered messages
threadPool.schedule(new Runnable() {
@Override public void run() {
try {
sendPingRequest(id);
} catch (Exception e) {
logger.debug("[{}] Failed to send second ping request", e, id);
}
}
}, timeout.millis() / 2, TimeUnit.MILLISECONDS);
threadPool.schedule(new Runnable() {
@Override public void run() {
ConcurrentMap<DiscoveryNode, PingResponse> responses = receivedResponses.remove(id);
listener.onPing(responses.values().toArray(new PingResponse[responses.size()]));
}
}, timeout.millis(), TimeUnit.MILLISECONDS);
}
private void sendPingRequest(int id) {
synchronized (sendMutex) {
try {
HandlesStreamOutput out = BytesStreamOutput.Cached.cachedHandles();
out.writeInt(id);
clusterName.writeTo(out);
nodesProvider.nodes().localNode().writeTo(out);
datagramPacketSend.setData(((BytesStreamOutput) out.wrappedOut()).copiedByteArray());
} catch (IOException e) {
receivedResponses.remove(id);
throw new ZenPingException("Failed to serialize ping request", e);
}
try {
multicastSocket.send(datagramPacketSend);
if (logger.isTraceEnabled()) {
logger.trace("[{}] Sending ping request", id);
}
} catch (IOException e) {
receivedResponses.remove(id);
throw new ZenPingException("Failed to send ping request over multicast", e);
}
}
}
private class PingResponseRequestHandler extends BaseTransportRequestHandler<WrappedPingResponse> {
static final String ACTION = "discovery/zen/multicast";
@Override public WrappedPingResponse newInstance() {
return new WrappedPingResponse();
}
@Override public void messageReceived(WrappedPingResponse request, TransportChannel channel) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("[{}] Received {}", request.id, request.pingResponse);
}
ConcurrentMap<DiscoveryNode, PingResponse> responses = receivedResponses.get(request.id);
if (responses == null) {
logger.warn("Received ping response with no matching id [{}]", request.id);
} else {
responses.put(request.pingResponse.target(), request.pingResponse);
}
channel.sendResponse(VoidStreamable.INSTANCE);
}
}
class WrappedPingResponse implements Streamable {
int id;
PingResponse pingResponse;
WrappedPingResponse() {
}
WrappedPingResponse(int id, PingResponse pingResponse) {
this.id = id;
this.pingResponse = pingResponse;
}
@Override public void readFrom(StreamInput in) throws IOException {
id = in.readInt();
pingResponse = new PingResponse();
pingResponse.readFrom(in);
}
@Override public void writeTo(StreamOutput out) throws IOException {
out.writeInt(id);
pingResponse.writeTo(out);
}
}
private class Receiver implements Runnable {
private volatile boolean running = true;
public void stop() {
running = false;
}
@Override public void run() {
while (running) {
try {
int id;
DiscoveryNode requestingNodeX;
ClusterName clusterName;
synchronized (receiveMutex) {
try {
multicastSocket.receive(datagramPacketReceive);
} catch (SocketTimeoutException ignore) {
continue;
} catch (Exception e) {
if (running) {
logger.warn("Failed to receive packet", e);
}
continue;
}
try {
StreamInput input = HandlesStreamInput.Cached.cached(new BytesStreamInput(datagramPacketReceive.getData(), datagramPacketReceive.getOffset(), datagramPacketReceive.getLength()));
id = input.readInt();
clusterName = ClusterName.readClusterName(input);
requestingNodeX = readNode(input);
} catch (Exception e) {
logger.warn("Failed to read requesting node from {}", e, datagramPacketReceive.getSocketAddress());
continue;
}
}
DiscoveryNodes discoveryNodes = nodesProvider.nodes();
final DiscoveryNode requestingNode = requestingNodeX;
if (requestingNode.id().equals(discoveryNodes.localNodeId())) {
// that's me, ignore
continue;
}
if (!clusterName.equals(MulticastZenPing.this.clusterName)) {
// not our cluster, ignore it...
continue;
}
final WrappedPingResponse wrappedPingResponse = new WrappedPingResponse();
wrappedPingResponse.id = id;
wrappedPingResponse.pingResponse = new PingResponse(discoveryNodes.localNode(), discoveryNodes.masterNode());
if (logger.isTraceEnabled()) {
logger.trace("[{}] Received ping_request from [{}], sending {}", id, requestingNode, wrappedPingResponse.pingResponse);
}
if (!transportService.nodeConnected(requestingNode)) {
// do the connect and send on a thread pool
threadPool.execute(new Runnable() {
@Override public void run() {
// connect to the node if possible
try {
transportService.connectToNode(requestingNode);
} catch (Exception e) {
logger.warn("Failed to connect to requesting node {}", e, requestingNode);
}
transportService.sendRequest(requestingNode, PingResponseRequestHandler.ACTION, wrappedPingResponse, new VoidTransportResponseHandler(false) {
@Override public void handleException(RemoteTransportException exp) {
logger.warn("Failed to receive confirmation on sent ping response to [{}]", exp, requestingNode);
}
});
}
});
} else {
transportService.sendRequest(requestingNode, PingResponseRequestHandler.ACTION, wrappedPingResponse, new VoidTransportResponseHandler(false) {
@Override public void handleException(RemoteTransportException exp) {
logger.warn("Failed to receive confirmation on sent ping response to [{}]", exp, requestingNode);
}
});
}
} catch (Exception e) {
logger.warn("Unexpected exception in multicast receiver", e);
}
}
}
}
}

View File

@ -0,0 +1,111 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.discovery.zen.publish;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
import org.elasticsearch.transport.*;
import org.elasticsearch.util.component.AbstractComponent;
import org.elasticsearch.util.io.stream.StreamInput;
import org.elasticsearch.util.io.stream.StreamOutput;
import org.elasticsearch.util.io.stream.Streamable;
import org.elasticsearch.util.io.stream.VoidStreamable;
import org.elasticsearch.util.settings.Settings;
import java.io.IOException;
/**
* @author kimchy (shay.banon)
*/
public class PublishClusterStateAction extends AbstractComponent {
public static interface NewClusterStateListener {
void onNewClusterState(ClusterState clusterState);
}
private final TransportService transportService;
private final DiscoveryNodesProvider nodesProvider;
private final NewClusterStateListener listener;
public PublishClusterStateAction(Settings settings, TransportService transportService, DiscoveryNodesProvider nodesProvider,
NewClusterStateListener listener) {
super(settings);
this.transportService = transportService;
this.nodesProvider = nodesProvider;
this.listener = listener;
transportService.registerHandler(PublishClusterStateRequestHandler.ACTION, new PublishClusterStateRequestHandler());
}
public void close() {
transportService.removeHandler(PublishClusterStateRequestHandler.ACTION);
}
public void publish(ClusterState clusterState) {
DiscoveryNode localNode = nodesProvider.nodes().localNode();
for (final DiscoveryNode node : clusterState.nodes()) {
if (node.equals(localNode)) {
// no need to send to our self
continue;
}
transportService.sendRequest(node, PublishClusterStateRequestHandler.ACTION, new PublishClusterStateRequest(clusterState), new VoidTransportResponseHandler(false) {
@Override public void handleException(RemoteTransportException exp) {
logger.warn("Failed to send cluster state to [{}], should be detected as failed soon...", exp, node);
}
});
}
}
private class PublishClusterStateRequest implements Streamable {
private ClusterState clusterState;
private PublishClusterStateRequest() {
}
private PublishClusterStateRequest(ClusterState clusterState) {
this.clusterState = clusterState;
}
@Override public void readFrom(StreamInput in) throws IOException {
clusterState = ClusterState.Builder.readFrom(in, settings, nodesProvider.nodes().localNode());
}
@Override public void writeTo(StreamOutput out) throws IOException {
ClusterState.Builder.writeTo(clusterState, out);
}
}
private class PublishClusterStateRequestHandler extends BaseTransportRequestHandler<PublishClusterStateRequest> {
static final String ACTION = "discovery/zen/publish";
@Override public PublishClusterStateRequest newInstance() {
return new PublishClusterStateRequest();
}
@Override public void messageReceived(PublishClusterStateRequest request, TransportChannel channel) throws Exception {
listener.onNewClusterState(request.clusterState);
channel.sendResponse(VoidStreamable.INSTANCE);
}
}
}

View File

@ -27,6 +27,7 @@ import org.elasticsearch.transport.netty.NettyInternalESLoggerFactory;
import org.elasticsearch.util.SizeUnit;
import org.elasticsearch.util.SizeValue;
import org.elasticsearch.util.component.AbstractLifecycleComponent;
import org.elasticsearch.util.io.NetworkUtils;
import org.elasticsearch.util.settings.Settings;
import org.elasticsearch.util.transport.BoundTransportAddress;
import org.elasticsearch.util.transport.InetSocketTransportAddress;
@ -49,7 +50,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.util.concurrent.DynamicExecutors.*;
import static org.elasticsearch.util.io.HostResolver.*;
import static org.elasticsearch.util.io.NetworkUtils.*;
/**
* @author kimchy (shay.banon)
@ -103,7 +104,7 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent<HttpSer
this.publishHost = componentSettings.get("publish_host");
this.tcpNoDelay = componentSettings.getAsBoolean("tcp_no_delay", true);
this.tcpKeepAlive = componentSettings.getAsBoolean("tcp_keep_alive", null);
this.reuseAddress = componentSettings.getAsBoolean("reuse_address", null);
this.reuseAddress = componentSettings.getAsBoolean("reuse_address", NetworkUtils.defaultReuseAddress());
this.tcpSendBufferSize = componentSettings.getAsSize("tcp_send_buffer_size", null);
this.tcpReceiveBufferSize = componentSettings.getAsSize("tcp_receive_buffer_size", null);
@ -189,17 +190,7 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent<HttpSer
InetSocketAddress boundAddress = (InetSocketAddress) serverChannel.getLocalAddress();
InetSocketAddress publishAddress;
try {
InetAddress publishAddressX = resolvePublishHostAddress(publishHost, settings);
if (publishAddressX == null) {
// if its 0.0.0.0, we can't publish that.., default to the local ip address
if (boundAddress.getAddress().isAnyLocalAddress()) {
publishAddress = new InetSocketAddress(resolvePublishHostAddress(publishHost, settings, LOCAL_IP), boundAddress.getPort());
} else {
publishAddress = boundAddress;
}
} else {
publishAddress = new InetSocketAddress(publishAddressX, boundAddress.getPort());
}
publishAddress = new InetSocketAddress(resolvePublishHostAddress(publishHost, settings), boundAddress.getPort());
} catch (Exception e) {
throw new BindTransportException("Failed to resolve publish address", e);
}

View File

@ -22,7 +22,7 @@ package org.elasticsearch.index.query.json;
import org.elasticsearch.util.json.ToJson;
/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public interface JsonFilterBuilder extends ToJson {

View File

@ -19,7 +19,7 @@
package org.elasticsearch.jmx;
import org.elasticsearch.util.io.HostResolver;
import org.elasticsearch.util.io.NetworkUtils;
import org.elasticsearch.util.logging.ESLogger;
import org.elasticsearch.util.settings.Settings;
import org.elasticsearch.util.transport.PortsRange;
@ -36,8 +36,6 @@ import java.rmi.registry.LocateRegistry;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.util.io.HostResolver.*;
/**
* @author kimchy (Shay Banon)
*/
@ -116,7 +114,7 @@ public class JmxService {
connectorServer.start();
// create the publish url
String publishHost = HostResolver.resolvePublishHostAddress(settings.get("jmx.publishHost"), settings, LOCAL_IP).getHostAddress();
String publishHost = NetworkUtils.resolvePublishHostAddress(settings.get("jmx.publishHost"), settings).getHostAddress();
publishUrl = settings.get("jmx.publishUrl", JMXRMI_PUBLISH_URI_PATTERN).replace("{jmx.port}", Integer.toString(portNumber)).replace("{jmx.host}", publishHost);
} catch (Exception e) {
lastException.set(e);

View File

@ -46,6 +46,7 @@ import org.elasticsearch.jmx.JmxModule;
import org.elasticsearch.jmx.JmxService;
import org.elasticsearch.monitor.MonitorModule;
import org.elasticsearch.monitor.MonitorService;
import org.elasticsearch.monitor.jvm.JvmConfig;
import org.elasticsearch.node.Node;
import org.elasticsearch.plugins.PluginsModule;
import org.elasticsearch.plugins.PluginsService;
@ -101,7 +102,7 @@ public final class InternalNode implements Node {
Tuple<Settings, Environment> tuple = InternalSettingsPerparer.prepareSettings(pSettings, loadConfigSettings);
ESLogger logger = Loggers.getLogger(Node.class, tuple.v1().get("name"));
logger.info("{{}}: Initializing ...", Version.full());
logger.info("{{}}[{}]: Initializing ...", Version.full(), JvmConfig.jvmConfig().pid());
this.pluginsService = new PluginsService(tuple.v1(), tuple.v2());
this.settings = pluginsService.updatedSettings();
@ -135,7 +136,7 @@ public final class InternalNode implements Node {
client = injector.getInstance(Client.class);
logger.info("{{}}: Initialized", Version.full());
logger.info("{{}}[{}]: Initialized", Version.full(), JvmConfig.jvmConfig().pid());
}
@Override public Settings settings() {
@ -152,7 +153,7 @@ public final class InternalNode implements Node {
}
ESLogger logger = Loggers.getLogger(Node.class, settings.get("name"));
logger.info("{{}}: Starting ...", Version.full());
logger.info("{{}}[{}]: Starting ...", Version.full(), JvmConfig.jvmConfig().pid());
for (Class<? extends LifecycleComponent> plugin : pluginsService.services()) {
injector.getInstance(plugin).start();
@ -175,7 +176,7 @@ public final class InternalNode implements Node {
}
injector.getInstance(JmxService.class).connectAndRegister(discoService.nodeDescription());
logger.info("{{}}: Started", Version.full());
logger.info("{{}}[{}]: Started", Version.full(), JvmConfig.jvmConfig().pid());
return this;
}
@ -185,7 +186,7 @@ public final class InternalNode implements Node {
return this;
}
ESLogger logger = Loggers.getLogger(Node.class, settings.get("name"));
logger.info("{{}}: Stopping ...", Version.full());
logger.info("{{}}[{}]: Stopping ...", Version.full(), JvmConfig.jvmConfig().pid());
if (settings.getAsBoolean("http.enabled", true)) {
injector.getInstance(HttpServer.class).stop();
@ -215,7 +216,7 @@ public final class InternalNode implements Node {
Injectors.close(injector);
logger.info("{{}}: Stopped", Version.full());
logger.info("{{}}[{}]: Stopped", Version.full(), JvmConfig.jvmConfig().pid());
return this;
}
@ -229,7 +230,7 @@ public final class InternalNode implements Node {
}
ESLogger logger = Loggers.getLogger(Node.class, settings.get("name"));
logger.info("{{}}: Closing ...", Version.full());
logger.info("{{}}[{}]: Closing ...", Version.full(), JvmConfig.jvmConfig().pid());
if (settings.getAsBoolean("http.enabled", true)) {
injector.getInstance(HttpServer.class).close();
@ -264,7 +265,7 @@ public final class InternalNode implements Node {
ThreadLocals.clearReferencesThreadLocals();
logger.info("{{}}: Closed", Version.full());
logger.info("{{}}[{}]: Closed", Version.full(), JvmConfig.jvmConfig().pid());
}
public Injector injector() {

View File

@ -39,5 +39,7 @@ public interface ThreadPool extends ScheduledExecutorService {
Future<?> submit(Runnable task, FutureListener<?> listener);
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, TimeValue interval);
public ScheduledFuture<?> schedule(Runnable command, TimeValue delay);
ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, TimeValue interval);
}

View File

@ -120,6 +120,10 @@ public abstract class AbstractThreadPool extends AbstractComponent implements Th
return executorService.submit(new FutureRunnable(task, null, listener));
}
@Override public ScheduledFuture<?> schedule(Runnable command, TimeValue delay) {
return schedule(command, delay.millis(), TimeUnit.MILLISECONDS);
}
@Override public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, TimeValue interval) {
return scheduleWithFixedDelay(command, interval.millis(), interval.millis(), TimeUnit.MILLISECONDS);
}

View File

@ -23,6 +23,7 @@ import com.google.inject.Inject;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.util.TimeValue;
import org.elasticsearch.util.component.AbstractComponent;
import org.elasticsearch.util.settings.ImmutableSettings;
import org.elasticsearch.util.settings.Settings;
import org.elasticsearch.util.timer.HashedWheelTimer;
import org.elasticsearch.util.timer.Timeout;
@ -50,6 +51,12 @@ public class TimerService extends AbstractComponent {
private final TimeValue tickDuration;
private final int ticksPerWheel;
public TimerService(ThreadPool threadPool) {
this(ImmutableSettings.Builder.EMPTY_SETTINGS, threadPool);
}
@Inject public TimerService(Settings settings, ThreadPool threadPool) {
super(settings);
this.threadPool = threadPool;
@ -58,8 +65,9 @@ public class TimerService extends AbstractComponent {
this.timeEstimatorFuture = threadPool.scheduleWithFixedDelay(timeEstimator, 50, 50, TimeUnit.MILLISECONDS);
this.tickDuration = componentSettings.getAsTime("tick_duration", timeValueMillis(100));
this.ticksPerWheel = componentSettings.getAsInt("ticks_per_wheel", 1024);
this.timer = new HashedWheelTimer(logger, daemonThreadFactory(settings, "timer"), tickDuration.millis(), TimeUnit.MILLISECONDS);
this.timer = new HashedWheelTimer(logger, daemonThreadFactory(settings, "timer"), tickDuration.millis(), TimeUnit.MILLISECONDS, ticksPerWheel);
}
public void close() {

View File

@ -22,7 +22,7 @@ package org.elasticsearch.transport;
import org.elasticsearch.cluster.node.DiscoveryNode;
/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public class ConnectTransportException extends TransportException {

View File

@ -0,0 +1,36 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport;
import org.elasticsearch.cluster.node.DiscoveryNode;
/**
* @author kimchy (shay.banon)
*/
public class NodeDisconnectedTransportException extends RemoteTransportException {
public NodeDisconnectedTransportException(DiscoveryNode node, String action) {
super(node.name(), node.address(), action, null);
}
// @Override public Throwable fillInStackTrace() {
// return fillStack();
// }
}

View File

@ -0,0 +1,36 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport;
import org.elasticsearch.cluster.node.DiscoveryNode;
/**
* @author kimchy (shay.banon)
*/
public class ReceiveTimeoutTransportException extends RemoteTransportException {
public ReceiveTimeoutTransportException(DiscoveryNode node, String action) {
super(node.name(), node.address(), action, null);
}
// @Override public Throwable fillInStackTrace() {
// return fillStack();
// }
}

View File

@ -28,9 +28,9 @@ public interface TransportResponseHandler<T extends Streamable> {
/**
* creates a new instance of the return type from the remote call.
* called by the infra before deserializing the response.
* called by the infra before de-serializing the response.
*
* @return a new reponse copy.
* @return a new response copy.
*/
T newInstance();

View File

@ -23,13 +23,18 @@ import com.google.inject.Inject;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.timer.TimerService;
import org.elasticsearch.util.TimeValue;
import org.elasticsearch.util.component.AbstractLifecycleComponent;
import org.elasticsearch.util.concurrent.highscalelib.NonBlockingHashMapLong;
import org.elasticsearch.util.io.stream.Streamable;
import org.elasticsearch.util.settings.Settings;
import org.elasticsearch.util.timer.Timeout;
import org.elasticsearch.util.timer.TimerTask;
import org.elasticsearch.util.transport.BoundTransportAddress;
import org.elasticsearch.util.transport.TransportAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;
@ -46,9 +51,11 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
private final ThreadPool threadPool;
private final TimerService timerService;
final ConcurrentMap<String, TransportRequestHandler> serverHandlers = newConcurrentMap();
final NonBlockingHashMapLong<TransportResponseHandler> clientHandlers = new NonBlockingHashMapLong<TransportResponseHandler>();
final NonBlockingHashMapLong<RequestHolder> clientHandlers = new NonBlockingHashMapLong<RequestHolder>();
final AtomicLong requestIds = new AtomicLong();
@ -56,39 +63,20 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
private boolean throwConnectException = false;
public TransportService(Transport transport, ThreadPool threadPool) {
this(EMPTY_SETTINGS, transport, threadPool);
public TransportService(Transport transport, ThreadPool threadPool, TimerService timerService) {
this(EMPTY_SETTINGS, transport, threadPool, timerService);
}
@Inject public TransportService(Settings settings, Transport transport, ThreadPool threadPool) {
@Inject public TransportService(Settings settings, Transport transport, ThreadPool threadPool, TimerService timerService) {
super(settings);
this.transport = transport;
this.threadPool = threadPool;
this.timerService = timerService;
}
@Override protected void doStart() throws ElasticSearchException {
// register us as an adapter for the transport service
transport.transportServiceAdapter(new TransportServiceAdapter() {
@Override public TransportRequestHandler handler(String action) {
return serverHandlers.get(action);
}
@Override public TransportResponseHandler remove(long requestId) {
return clientHandlers.remove(requestId);
}
@Override public void raiseNodeConnected(DiscoveryNode node) {
for (TransportConnectionListener connectionListener : connectionListeners) {
connectionListener.onNodeConnected(node);
}
}
@Override public void raiseNodeDisconnected(DiscoveryNode node) {
for (TransportConnectionListener connectionListener : connectionListeners) {
connectionListener.onNodeDisconnected(node);
}
}
});
transport.transportServiceAdapter(new Adapter());
transport.start();
if (transport.boundAddress() != null && logger.isInfoEnabled()) {
logger.info("{}", transport.boundAddress());
@ -144,16 +132,30 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
public <T extends Streamable> TransportFuture<T> submitRequest(DiscoveryNode node, String action, Streamable message,
TransportResponseHandler<T> handler) throws TransportException {
return submitRequest(node, action, message, null, handler);
}
public <T extends Streamable> TransportFuture<T> submitRequest(DiscoveryNode node, String action, Streamable message,
TimeValue timeout, TransportResponseHandler<T> handler) throws TransportException {
PlainTransportFuture<T> futureHandler = new PlainTransportFuture<T>(handler);
sendRequest(node, action, message, futureHandler);
sendRequest(node, action, message, timeout, futureHandler);
return futureHandler;
}
public <T extends Streamable> void sendRequest(final DiscoveryNode node, final String action, final Streamable message,
final TransportResponseHandler<T> handler) throws TransportException {
sendRequest(node, action, message, null, handler);
}
public <T extends Streamable> void sendRequest(final DiscoveryNode node, final String action, final Streamable message,
final TimeValue timeout, final TransportResponseHandler<T> handler) throws TransportException {
final long requestId = newRequestId();
try {
clientHandlers.put(requestId, handler);
Timeout timeoutX = null;
if (timeout != null) {
timeoutX = timerService.newTimeout(new TimeoutTimerTask(requestId), timeout);
}
clientHandlers.put(requestId, new RequestHolder<T>(handler, node, action, timeoutX));
transport.sendRequest(node, requestId, action, message, handler);
} catch (final Exception e) {
// usually happen either because we failed to connect to the node
@ -183,10 +185,105 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
}
public void registerHandler(String action, TransportRequestHandler handler) {
serverHandlers.put(action, handler);
TransportRequestHandler handlerReplaced = serverHandlers.put(action, handler);
if (handlerReplaced != null) {
logger.warn("Registered two transport handlers for action {}, handlers: {}, {}", action, handler, handlerReplaced);
}
}
public void removeHandler(String action) {
serverHandlers.remove(action);
}
class Adapter implements TransportServiceAdapter {
@Override public TransportRequestHandler handler(String action) {
return serverHandlers.get(action);
}
@Override public TransportResponseHandler remove(long requestId) {
RequestHolder holder = clientHandlers.remove(requestId);
if (holder == null) {
return null;
}
if (holder.timeout() != null) {
holder.timeout().cancel();
}
return holder.handler();
}
@Override public void raiseNodeConnected(DiscoveryNode node) {
for (TransportConnectionListener connectionListener : connectionListeners) {
connectionListener.onNodeConnected(node);
}
}
@Override public void raiseNodeDisconnected(DiscoveryNode node) {
for (TransportConnectionListener connectionListener : connectionListeners) {
connectionListener.onNodeDisconnected(node);
}
// node got disconnected, raise disconnection on possible ongoing handlers
for (Map.Entry<Long, RequestHolder> entry : clientHandlers.entrySet()) {
RequestHolder holder = entry.getValue();
if (holder.node().equals(node)) {
holder = clientHandlers.remove(entry.getKey());
if (holder != null) {
holder.handler().handleException(new NodeDisconnectedTransportException(node, holder.action()));
}
}
}
}
}
class TimeoutTimerTask implements TimerTask {
private final long requestId;
TimeoutTimerTask(long requestId) {
this.requestId = requestId;
}
@Override public void run(Timeout timeout) throws Exception {
if (timeout.isCancelled()) {
return;
}
RequestHolder holder = clientHandlers.remove(requestId);
if (holder != null) {
holder.handler().handleException(new ReceiveTimeoutTransportException(holder.node(), holder.action()));
}
}
}
static class RequestHolder<T extends Streamable> {
private final TransportResponseHandler<T> handler;
private final DiscoveryNode node;
private final String action;
private final Timeout timeout;
RequestHolder(TransportResponseHandler<T> handler, DiscoveryNode node, String action, Timeout timeout) {
this.handler = handler;
this.node = node;
this.action = action;
this.timeout = timeout;
}
public TransportResponseHandler<T> handler() {
return handler;
}
public DiscoveryNode node() {
return this.node;
}
public String action() {
return this.action;
}
public Timeout timeout() {
return timeout;
}
}
}

View File

@ -193,13 +193,22 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
private void handleRequest(StreamInput stream, long requestId, LocalTransport sourceTransport) throws Exception {
final String action = stream.readUTF();
final LocalTransportChannel transportChannel = new LocalTransportChannel(this, sourceTransport, action, requestId);
final TransportRequestHandler handler = transportServiceAdapter.handler(action);
if (handler == null) {
throw new ActionNotFoundTransportException("Action [" + action + "] not found");
try {
final TransportRequestHandler handler = transportServiceAdapter.handler(action);
if (handler == null) {
throw new ActionNotFoundTransportException("Action [" + action + "] not found");
}
final Streamable streamable = handler.newInstance();
streamable.readFrom(stream);
handler.messageReceived(streamable, transportChannel);
} catch (Exception e) {
try {
transportChannel.sendResponse(e);
} catch (IOException e1) {
logger.warn("Failed to send error message back to client for action [" + action + "]", e);
logger.warn("Actual Exception", e1);
}
}
final Streamable streamable = handler.newInstance();
streamable.readFrom(stream);
handler.messageReceived(streamable, transportChannel);
}

View File

@ -19,7 +19,6 @@
package org.elasticsearch.transport.netty;
import com.google.common.collect.Lists;
import com.google.inject.Inject;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalStateException;
@ -29,6 +28,7 @@ import org.elasticsearch.transport.*;
import org.elasticsearch.util.SizeValue;
import org.elasticsearch.util.TimeValue;
import org.elasticsearch.util.component.AbstractLifecycleComponent;
import org.elasticsearch.util.io.NetworkUtils;
import org.elasticsearch.util.io.stream.BytesStreamOutput;
import org.elasticsearch.util.io.stream.HandlesStreamOutput;
import org.elasticsearch.util.io.stream.Streamable;
@ -51,7 +51,6 @@ import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
@ -61,11 +60,12 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static com.google.common.collect.Lists.*;
import static org.elasticsearch.transport.Transport.Helper.*;
import static org.elasticsearch.util.TimeValue.*;
import static org.elasticsearch.util.concurrent.ConcurrentMaps.*;
import static org.elasticsearch.util.concurrent.DynamicExecutors.*;
import static org.elasticsearch.util.io.HostResolver.*;
import static org.elasticsearch.util.io.NetworkUtils.*;
import static org.elasticsearch.util.settings.ImmutableSettings.Builder.*;
import static org.elasticsearch.util.transport.NetworkExceptionHelper.*;
@ -94,8 +94,6 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
final int connectionsPerNode;
final int connectRetries;
final Boolean tcpNoDelay;
final Boolean tcpKeepAlive;
@ -138,10 +136,9 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
this.connectionsPerNode = componentSettings.getAsInt("connections_per_node", 5);
this.publishHost = componentSettings.get("publish_host");
this.connectTimeout = componentSettings.getAsTime("connect_timeout", timeValueSeconds(1));
this.connectRetries = componentSettings.getAsInt("connect_retries", 2);
this.tcpNoDelay = componentSettings.getAsBoolean("tcp_no_delay", true);
this.tcpKeepAlive = componentSettings.getAsBoolean("tcp_keep_alive", null);
this.reuseAddress = componentSettings.getAsBoolean("reuse_address", null);
this.reuseAddress = componentSettings.getAsBoolean("reuse_address", NetworkUtils.defaultReuseAddress());
this.tcpSendBufferSize = componentSettings.getAsSize("tcp_send_buffer_size", null);
this.tcpReceiveBufferSize = componentSettings.getAsSize("tcp_receive_buffer_size", null);
}
@ -260,17 +257,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
InetSocketAddress boundAddress = (InetSocketAddress) serverChannel.getLocalAddress();
InetSocketAddress publishAddress;
try {
InetAddress publishAddressX = resolvePublishHostAddress(publishHost, settings);
if (publishAddressX == null) {
// if its 0.0.0.0, we can't publish that.., default to the local ip address
if (boundAddress.getAddress().isAnyLocalAddress()) {
publishAddress = new InetSocketAddress(resolvePublishHostAddress(publishHost, settings, LOCAL_IP), boundAddress.getPort());
} else {
publishAddress = boundAddress;
}
} else {
publishAddress = new InetSocketAddress(publishAddressX, boundAddress.getPort());
}
publishAddress = new InetSocketAddress(resolvePublishHostAddress(publishHost, settings), boundAddress.getPort());
} catch (Exception e) {
throw new BindTransportException("Failed to resolve publish address", e);
}
@ -412,59 +399,35 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
if (nodeConnections != null) {
return;
}
// build connection(s) to the node
ArrayList<Channel> channels = new ArrayList<Channel>();
Throwable lastConnectException = null;
List<ChannelFuture> connectFutures = newArrayList();
for (int connectionIndex = 0; connectionIndex < connectionsPerNode; connectionIndex++) {
for (int i = 1; i <= connectRetries; i++) {
if (!lifecycle.started()) {
for (Channel channel1 : channels) {
channel1.close().awaitUninterruptibly();
}
throw new ConnectTransportException(node, "Can't connect when the transport is stopped");
}
InetSocketAddress address = ((InetSocketTransportAddress) node.address()).address();
ChannelFuture channelFuture = clientBootstrap.connect(address);
channelFuture.awaitUninterruptibly((long) (connectTimeout.millis() * 1.25));
if (!channelFuture.isSuccess()) {
// we failed to connect, check if we need to bail or retry
if (i == connectRetries && connectionIndex == 0) {
lastConnectException = channelFuture.getCause();
if (connectionIndex == 0) {
throw new ConnectTransportException(node, "connectTimeout[" + connectTimeout + "], connectRetries[" + connectRetries + "]", lastConnectException);
} else {
// break out of the retry loop, try another connection
break;
}
} else {
logger.trace("Retry #[" + i + "], connect to [" + node + "]");
try {
channelFuture.getChannel().close();
} catch (Exception e) {
// ignore
}
continue;
}
}
// we got a connection, add it to our connections
Channel channel = channelFuture.getChannel();
if (!lifecycle.started()) {
channel.close();
for (Channel channel1 : channels) {
channel1.close().awaitUninterruptibly();
}
throw new ConnectTransportException(node, "Can't connect when the transport is stopped");
InetSocketAddress address = ((InetSocketTransportAddress) node.address()).address();
connectFutures.add(clientBootstrap.connect(address));
}
List<Channel> channels = newArrayList();
Throwable lastConnectException = null;
for (ChannelFuture connectFuture : connectFutures) {
if (!lifecycle.started()) {
for (Channel channel : channels) {
channel.close().awaitUninterruptibly();
}
throw new ConnectTransportException(node, "Can't connect when the transport is stopped");
}
connectFuture.awaitUninterruptibly((long) (connectTimeout.millis() * 1.25));
if (!connectFuture.isSuccess()) {
lastConnectException = connectFuture.getCause();
} else {
Channel channel = connectFuture.getChannel();
channel.getCloseFuture().addListener(new ChannelCloseListener(node.id()));
channels.add(channel);
break;
}
}
if (channels.isEmpty()) {
if (lastConnectException != null) {
throw new ConnectTransportException(node, "connectTimeout[" + connectTimeout + "], connectRetries[" + connectRetries + "]", lastConnectException);
throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", lastConnectException);
}
throw new ConnectTransportException(node, "connectTimeout[" + connectTimeout + "], connectRetries[" + connectRetries + "], reason unknown");
throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "], reason unknown");
}
if (logger.isDebugEnabled()) {
logger.debug("Connected to node[{}], number_of_connections[{}]", node, channels.size());
@ -516,7 +479,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
}
private void channelClosed(Channel closedChannel) {
List<Channel> updated = Lists.newArrayList();
List<Channel> updated = newArrayList();
for (Channel channel : channels) {
if (!channel.getId().equals(closedChannel.getId())) {
updated.add(channel);

View File

@ -65,11 +65,6 @@ public class NettyTransportManagement {
return transport.connectTimeout.toString();
}
@ManagedAttribute(description = "Connect retries")
public int getConnectRetries() {
return transport.connectRetries;
}
@ManagedAttribute(description = "TcpNoDelay")
public Boolean getTcpNoDelay() {
return transport.tcpNoDelay;

View File

@ -1,141 +0,0 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.util.io;
import org.elasticsearch.util.settings.Settings;
import java.io.IOException;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.util.Enumeration;
/**
* @author kimchy (Shay Banon)
*/
public abstract class HostResolver {
public static final String GLOBAL_NETWORK_BINDHOST_SETTING = "network.bind_host";
public static final String GLOBAL_NETWORK_PUBLISHHOST_SETTING = "network.publish_host";
public static final String LOCAL_IP = "#local:ip#";
public static final String LOCAL_HOST = "#local:host#";
public static final String LOCAL_CANONICALHOST = "#local:canonicalhost#";
public static boolean isIPv4() {
return System.getProperty("java.net.preferIPv4Stack") != null && System.getProperty("java.net.preferIPv4Stack").equals("true");
}
public static InetAddress resolveBindHostAddress(String bindHost, Settings settings) throws IOException {
return resolveBindHostAddress(bindHost, settings, null);
}
public static InetAddress resolveBindHostAddress(String bindHost, Settings settings, String defaultValue2) throws IOException {
return resolveInetAddress(bindHost, settings.get(GLOBAL_NETWORK_BINDHOST_SETTING), defaultValue2);
}
public static InetAddress resolvePublishHostAddress(String publishHost, Settings settings) throws IOException {
return resolvePublishHostAddress(publishHost, settings, null);
}
public static InetAddress resolvePublishHostAddress(String publishHost, Settings settings, String defaultValue2) throws IOException {
return resolveInetAddress(publishHost, settings.get(GLOBAL_NETWORK_PUBLISHHOST_SETTING), defaultValue2);
}
public static InetAddress resolveInetAddress(String host, String defaultValue1, String defaultValue2) throws UnknownHostException, IOException {
String resolvedHost = resolveHost(host, defaultValue1, defaultValue2);
if (resolvedHost == null) {
return null;
}
return InetAddress.getByName(resolvedHost);
}
public static String resolveHost(String host, String defaultValue1, String defaultValue2) throws UnknownHostException, IOException {
if (host == null) {
host = defaultValue1;
}
if (host == null) {
host = defaultValue2;
}
if (host == null) {
return null;
}
if (host.startsWith("#") && host.endsWith("#")) {
host = host.substring(1, host.length() - 1);
if (host.equals("local:ip")) {
return InetAddress.getLocalHost().getHostAddress();
} else if (host.equalsIgnoreCase("local:host")) {
return InetAddress.getLocalHost().getHostName();
} else if (host.equalsIgnoreCase("local:canonicalhost")) {
return InetAddress.getLocalHost().getCanonicalHostName();
} else {
String name = host.substring(0, host.indexOf(':'));
String type = host.substring(host.indexOf(':') + 1);
Enumeration<NetworkInterface> niEnum;
try {
niEnum = NetworkInterface.getNetworkInterfaces();
} catch (SocketException e) {
throw new IOException("Failed to get network interfaces", e);
}
while (niEnum.hasMoreElements()) {
NetworkInterface ni = niEnum.nextElement();
if (name.equals(ni.getName()) || name.equals(ni.getDisplayName())) {
Enumeration<InetAddress> inetEnum = ni.getInetAddresses();
while (inetEnum.hasMoreElements()) {
InetAddress addr = inetEnum.nextElement();
if (addr.getHostAddress().equals("127.0.0.1")) {
// ignore local host
continue;
}
if (addr.getHostAddress().indexOf(".") == -1) {
// ignore address like 0:0:0:0:0:0:0:1
continue;
}
if ("host".equalsIgnoreCase(type)) {
return addr.getHostName();
} else if ("canonicalhost".equalsIgnoreCase(type)) {
return addr.getCanonicalHostName();
} else {
return addr.getHostAddress();
}
}
}
}
}
throw new IOException("Failed to find network interface for [" + host + "]");
}
InetAddress inetAddress = java.net.InetAddress.getByName(host);
String hostAddress = inetAddress.getHostAddress();
String hostName = inetAddress.getHostName();
String canonicalHostName = inetAddress.getCanonicalHostName();
if (host.equalsIgnoreCase(hostAddress)) {
return hostAddress;
} else if (host.equalsIgnoreCase(canonicalHostName)) {
return canonicalHostName;
} else {
return hostName; //resolve property into actual lower/upper case
}
}
private HostResolver() {
}
}

View File

@ -0,0 +1,286 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.util.io;
import org.elasticsearch.util.OsUtils;
import org.elasticsearch.util.logging.ESLogger;
import org.elasticsearch.util.logging.Loggers;
import org.elasticsearch.util.settings.Settings;
import java.io.IOException;
import java.net.*;
import java.util.*;
/**
* @author kimchy (shay.banon)
*/
public abstract class NetworkUtils {
private final static ESLogger logger = Loggers.getLogger(NetworkUtils.class);
public static enum StackType {
IPv4, IPv6, Unknown
}
public static final String IPv4_SETTING = "java.net.preferIPv4Stack";
public static final String IPv6_SETTING = "java.net.preferIPv6Addresses";
public static final String NON_LOOPBACK_ADDRESS = "non_loopback_address";
public static final String LOCAL = "#local#";
public static final String GLOBAL_NETWORK_BINDHOST_SETTING = "network.bind_host";
public static final String GLOBAL_NETWORK_PUBLISHHOST_SETTING = "network.publish_host";
private final static InetAddress localAddress;
static {
InetAddress localAddressX = null;
try {
localAddressX = InetAddress.getLocalHost();
} catch (UnknownHostException e) {
logger.warn("Failed to find local host", e);
}
localAddress = localAddressX;
}
public static Boolean defaultReuseAddress() {
return OsUtils.WINDOWS ? null : true;
}
public static boolean isIPv4() {
return System.getProperty("java.net.preferIPv4Stack") != null && System.getProperty("java.net.preferIPv4Stack").equals("true");
}
public static InetAddress resolveBindHostAddress(String bindHost, Settings settings) throws IOException {
return resolveBindHostAddress(bindHost, settings, null);
}
public static InetAddress resolveBindHostAddress(String bindHost, Settings settings, String defaultValue2) throws IOException {
return resolveInetAddress(bindHost, settings.get(GLOBAL_NETWORK_BINDHOST_SETTING), defaultValue2);
}
public static InetAddress resolvePublishHostAddress(String publishHost, Settings settings) throws IOException {
InetAddress address = resolvePublishHostAddress(publishHost, settings, null);
// verify that its not a local address
if (address == null || address.isAnyLocalAddress()) {
address = localAddress;
}
return address;
}
public static InetAddress resolvePublishHostAddress(String publishHost, Settings settings, String defaultValue2) throws IOException {
return resolveInetAddress(publishHost, settings.get(GLOBAL_NETWORK_PUBLISHHOST_SETTING), defaultValue2);
}
public static InetAddress resolveInetAddress(String host, String defaultValue1, String defaultValue2) throws UnknownHostException, IOException {
if (host == null) {
host = defaultValue1;
}
if (host == null) {
host = defaultValue2;
}
if (host == null) {
return null;
}
if (host.startsWith("#") && host.endsWith("#")) {
host = host.substring(1, host.length() - 1);
if (host.equals("local")) {
return localAddress;
} else {
Collection<NetworkInterface> allInterfs = getAllAvailableInterfaces();
for (NetworkInterface ni : allInterfs) {
if (!ni.isUp() || ni.isLoopback()) {
continue;
}
if (host.equals(ni.getName()) || host.equals(ni.getDisplayName())) {
return getFirstNonLoopbackAddress(ni, getIpStackType());
}
}
}
throw new IOException("Failed to find network interface for [" + host + "]");
}
return InetAddress.getByName(host);
}
public static InetAddress getIPv4Localhost() throws UnknownHostException {
return getLocalhost(StackType.IPv4);
}
public static InetAddress getIPv6Localhost() throws UnknownHostException {
return getLocalhost(StackType.IPv6);
}
public static InetAddress getLocalhost(StackType ip_version) throws UnknownHostException {
if (ip_version == StackType.IPv4)
return InetAddress.getByName("127.0.0.1");
else
return InetAddress.getByName("::1");
}
/**
* Returns the first non-loopback address on any interface on the current host.
*
* @param ip_version Constraint on IP version of address to be returned, 4 or 6
*/
public static InetAddress getFirstNonLoopbackAddress(StackType ip_version) throws SocketException {
InetAddress address = null;
Enumeration intfs = NetworkInterface.getNetworkInterfaces();
while (intfs.hasMoreElements()) {
NetworkInterface intf = (NetworkInterface) intfs.nextElement();
if (!intf.isUp() || intf.isLoopback())
continue;
address = getFirstNonLoopbackAddress(intf, ip_version);
if (address != null) {
return address;
}
}
return null;
}
/**
* Returns the first non-loopback address on the given interface on the current host.
*
* @param intf the interface to be checked
* @param ipVersion Constraint on IP version of address to be returned, 4 or 6
*/
public static InetAddress getFirstNonLoopbackAddress(NetworkInterface intf, StackType ipVersion) throws SocketException {
if (intf == null)
throw new IllegalArgumentException("Network interface pointer is null");
for (Enumeration addresses = intf.getInetAddresses(); addresses.hasMoreElements();) {
InetAddress address = (InetAddress) addresses.nextElement();
if (!address.isLoopbackAddress()) {
if ((address instanceof Inet4Address && ipVersion == StackType.IPv4) ||
(address instanceof Inet6Address && ipVersion == StackType.IPv6))
return address;
}
}
return null;
}
/**
* A function to check if an interface supports an IP version (i.e has addresses
* defined for that IP version).
*
* @param intf
* @return
*/
public static boolean interfaceHasIPAddresses(NetworkInterface intf, StackType ipVersion) throws SocketException, UnknownHostException {
boolean supportsVersion = false;
if (intf != null) {
// get all the InetAddresses defined on the interface
Enumeration addresses = intf.getInetAddresses();
while (addresses != null && addresses.hasMoreElements()) {
// get the next InetAddress for the current interface
InetAddress address = (InetAddress) addresses.nextElement();
// check if we find an address of correct version
if ((address instanceof Inet4Address && (ipVersion == StackType.IPv4)) ||
(address instanceof Inet6Address && (ipVersion == StackType.IPv6))) {
supportsVersion = true;
break;
}
}
} else {
throw new UnknownHostException("network interface " + intf + " not found");
}
return supportsVersion;
}
/**
* Tries to determine the type of IP stack from the available interfaces and their addresses and from the
* system properties (java.net.preferIPv4Stack and java.net.preferIPv6Addresses)
*
* @return StackType.IPv4 for an IPv4 only stack, StackYTypeIPv6 for an IPv6 only stack, and StackType.Unknown
* if the type cannot be detected
*/
public static StackType getIpStackType() {
boolean isIPv4StackAvailable = isStackAvailable(true);
boolean isIPv6StackAvailable = isStackAvailable(false);
// if only IPv4 stack available
if (isIPv4StackAvailable && !isIPv6StackAvailable) {
return StackType.IPv4;
}
// if only IPv6 stack available
else if (isIPv6StackAvailable && !isIPv4StackAvailable) {
return StackType.IPv6;
}
// if dual stack
else if (isIPv4StackAvailable && isIPv6StackAvailable) {
// get the System property which records user preference for a stack on a dual stack machine
if (Boolean.getBoolean(IPv4_SETTING)) // has preference over java.net.preferIPv6Addresses
return StackType.IPv4;
if (Boolean.getBoolean(IPv6_SETTING))
return StackType.IPv6;
return StackType.IPv6;
}
return StackType.Unknown;
}
public static boolean isStackAvailable(boolean ipv4) {
Collection<InetAddress> allAddrs = getAllAvailableAddresses();
for (InetAddress addr : allAddrs)
if (ipv4 && addr instanceof Inet4Address || (!ipv4 && addr instanceof Inet6Address))
return true;
return false;
}
public static List<NetworkInterface> getAllAvailableInterfaces() throws SocketException {
List<NetworkInterface> allInterfaces = new ArrayList<NetworkInterface>(10);
NetworkInterface intf;
for (Enumeration en = NetworkInterface.getNetworkInterfaces(); en.hasMoreElements();) {
intf = (NetworkInterface) en.nextElement();
allInterfaces.add(intf);
}
return allInterfaces;
}
public static Collection<InetAddress> getAllAvailableAddresses() {
Set<InetAddress> retval = new HashSet<InetAddress>();
Enumeration en;
try {
en = NetworkInterface.getNetworkInterfaces();
if (en == null)
return retval;
while (en.hasMoreElements()) {
NetworkInterface intf = (NetworkInterface) en.nextElement();
Enumeration<InetAddress> addrs = intf.getInetAddresses();
while (addrs.hasMoreElements())
retval.add(addrs.nextElement());
}
} catch (SocketException e) {
logger.warn("Failed to derive all available interfaces", e);
}
return retval;
}
private NetworkUtils() {
}
}

View File

@ -36,9 +36,13 @@ public class BytesStreamInput extends StreamInput {
protected int count;
public BytesStreamInput(byte buf[]) {
this(buf, 0, buf.length);
}
public BytesStreamInput(byte buf[], int position, int count) {
this.buf = buf;
this.pos = 0;
this.count = buf.length;
this.pos = position;
this.count = count;
}
@Override public byte readByte() throws IOException {

View File

@ -23,7 +23,7 @@ package org.elasticsearch.util.timer;
* A task which is executed after the delay specified with
* {@link Timer#newTimeout(TimerTask, long, java.util.concurrent.TimeUnit)}.
*
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public interface TimerTask {

View File

@ -0,0 +1,82 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.discovery.zen.ping.multicast;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
import org.elasticsearch.discovery.zen.ping.ZenPing;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.cached.CachedThreadPool;
import org.elasticsearch.timer.TimerService;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.local.LocalTransport;
import org.elasticsearch.util.TimeValue;
import org.testng.annotations.Test;
import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*;
/**
* @author kimchy (shay.banon)
*/
@Test
public class MulticastZenPingTests {
@Test public void testSimplePings() {
ThreadPool threadPool = new CachedThreadPool();
TimerService timerService = new TimerService(threadPool);
ClusterName clusterName = new ClusterName("test");
final TransportService transportServiceA = new TransportService(new LocalTransport(threadPool), threadPool, timerService).start();
final DiscoveryNode nodeA = new DiscoveryNode("A", transportServiceA.boundAddress().publishAddress());
final TransportService transportServiceB = new TransportService(new LocalTransport(threadPool), threadPool, timerService).start();
final DiscoveryNode nodeB = new DiscoveryNode("B", transportServiceA.boundAddress().publishAddress());
MulticastZenPing zenPingA = (MulticastZenPing) new MulticastZenPing(threadPool, transportServiceA, clusterName);
zenPingA.setNodesProvider(new DiscoveryNodesProvider() {
@Override public DiscoveryNodes nodes() {
return DiscoveryNodes.newNodesBuilder().put(nodeA).localNodeId("A").build();
}
});
zenPingA.start();
MulticastZenPing zenPingB = (MulticastZenPing) new MulticastZenPing(threadPool, transportServiceB, clusterName);
zenPingB.setNodesProvider(new DiscoveryNodesProvider() {
@Override public DiscoveryNodes nodes() {
return DiscoveryNodes.newNodesBuilder().put(nodeB).localNodeId("B").build();
}
});
zenPingB.start();
try {
ZenPing.PingResponse[] pingResponses = zenPingA.pingAndWait(TimeValue.timeValueSeconds(1));
assertThat(pingResponses.length, equalTo(1));
assertThat(pingResponses[0].target().id(), equalTo("B"));
} finally {
zenPingA.close();
zenPingB.close();
transportServiceA.close();
transportServiceB.close();
threadPool.shutdown();
}
}
}

View File

@ -21,7 +21,9 @@ package org.elasticsearch.transport;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.scaling.ScalingThreadPool;
import org.elasticsearch.threadpool.cached.CachedThreadPool;
import org.elasticsearch.timer.TimerService;
import org.elasticsearch.util.TimeValue;
import org.elasticsearch.util.io.stream.StreamInput;
import org.elasticsearch.util.io.stream.StreamOutput;
import org.elasticsearch.util.io.stream.Streamable;
@ -42,6 +44,7 @@ import static org.hamcrest.Matchers.*;
public abstract class AbstractSimpleTransportTests {
protected ThreadPool threadPool;
protected TimerService timerService;
protected TransportService serviceA;
protected TransportService serviceB;
@ -49,7 +52,8 @@ public abstract class AbstractSimpleTransportTests {
protected DiscoveryNode serviceBNode;
@BeforeMethod public void setUp() {
threadPool = new ScalingThreadPool();
threadPool = new CachedThreadPool();
timerService = new TimerService(threadPool);
build();
serviceA.connectToNode(serviceBNode);
serviceB.connectToNode(serviceANode);
@ -106,6 +110,8 @@ public abstract class AbstractSimpleTransportTests {
assertThat(e.getMessage(), false, equalTo(true));
}
serviceA.removeHandler("sayHello");
System.out.println("after ...");
}
@ -144,6 +150,8 @@ public abstract class AbstractSimpleTransportTests {
assertThat("bad message !!!", equalTo(e.getCause().getMessage()));
}
serviceA.removeHandler("sayHelloException");
System.out.println("after ...");
}
@ -162,7 +170,53 @@ public abstract class AbstractSimpleTransportTests {
};
serviceA.addConnectionListener(disconnectListener);
serviceB.close();
assertThat(latch.await(1, TimeUnit.SECONDS), equalTo(true));
assertThat(latch.await(5, TimeUnit.SECONDS), equalTo(true));
}
@Test public void testTimeoutSendException() throws Exception {
serviceA.registerHandler("sayHelloTimeout", new BaseTransportRequestHandler<StringMessage>() {
@Override public StringMessage newInstance() {
return new StringMessage();
}
@Override public void messageReceived(StringMessage request, TransportChannel channel) {
System.out.println("got message: " + request.message);
assertThat("moshe", equalTo(request.message));
// don't send back a response
// try {
// channel.sendResponse(new StringMessage("hello " + request.message));
// } catch (IOException e) {
// e.printStackTrace();
// assertThat(e.getMessage(), false, equalTo(true));
// }
}
});
TransportFuture<StringMessage> res = serviceB.submitRequest(serviceANode, "sayHelloTimeout",
new StringMessage("moshe"), TimeValue.timeValueMillis(100), new BaseTransportResponseHandler<StringMessage>() {
@Override public StringMessage newInstance() {
return new StringMessage();
}
@Override public void handleResponse(StringMessage response) {
assertThat("got response instead of exception", false, equalTo(true));
}
@Override public void handleException(RemoteTransportException exp) {
assertThat(exp, instanceOf(ReceiveTimeoutTransportException.class));
}
});
try {
StringMessage message = res.txGet();
assertThat("exception should be thrown", false, equalTo(true));
} catch (Exception e) {
assertThat(e, instanceOf(ReceiveTimeoutTransportException.class));
}
serviceA.removeHandler("sayHelloTimeout");
System.out.println("after ...");
}
private class StringMessage implements Streamable {

View File

@ -28,10 +28,10 @@ import org.testng.annotations.Test;
public class SimpleLocalTransportTests extends AbstractSimpleTransportTests {
@Override protected void build() {
serviceA = new TransportService(new LocalTransport(threadPool), threadPool).start();
serviceA = new TransportService(new LocalTransport(threadPool), threadPool, timerService).start();
serviceANode = new DiscoveryNode("A", serviceA.boundAddress().publishAddress());
serviceB = new TransportService(new LocalTransport(threadPool), threadPool).start();
serviceB = new TransportService(new LocalTransport(threadPool), threadPool, timerService).start();
serviceBNode = new DiscoveryNode("B", serviceB.boundAddress().publishAddress());
}
}

View File

@ -30,11 +30,10 @@ import static org.elasticsearch.util.settings.ImmutableSettings.*;
public class SimpleNettyTransportTests extends AbstractSimpleTransportTests {
@Override protected void build() {
serviceA = new TransportService(settingsBuilder().put("name", "A").build(), new NettyTransport(settingsBuilder().put("name", "A").build(), threadPool), threadPool).start();
serviceA = new TransportService(settingsBuilder().put("name", "A").build(), new NettyTransport(settingsBuilder().put("name", "A").build(), threadPool), threadPool, timerService).start();
serviceANode = new DiscoveryNode("A", serviceA.boundAddress().publishAddress());
serviceB = new TransportService(settingsBuilder().put("name", "B").build(), new NettyTransport(settingsBuilder().put("name", "B").build(), threadPool), threadPool).start();
serviceB = new TransportService(settingsBuilder().put("name", "B").build(), new NettyTransport(settingsBuilder().put("name", "B").build(), threadPool), threadPool, timerService).start();
serviceBNode = new DiscoveryNode("B", serviceB.boundAddress().publishAddress());
}
}

View File

@ -22,6 +22,7 @@ package org.elasticsearch.transport.netty.benchmark;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.cached.CachedThreadPool;
import org.elasticsearch.timer.TimerService;
import org.elasticsearch.transport.BaseTransportResponseHandler;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.TransportService;
@ -56,8 +57,9 @@ public class BenchmarkNettyClient {
.put("transport.netty.connectionsPerNode", 5)
.build();
final ThreadPool threadPool = new CachedThreadPool();
final TransportService transportService = new TransportService(new NettyTransport(settings, threadPool), threadPool).start();
final ThreadPool threadPool = new CachedThreadPool(settings);
final TimerService timerService = new TimerService(settings, threadPool);
final TransportService transportService = new TransportService(new NettyTransport(settings, threadPool), threadPool, timerService).start();
final DiscoveryNode node = new DiscoveryNode("server", new InetSocketTransportAddress("localhost", 9999));

View File

@ -21,6 +21,7 @@ package org.elasticsearch.transport.netty.benchmark;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.cached.CachedThreadPool;
import org.elasticsearch.timer.TimerService;
import org.elasticsearch.transport.BaseTransportRequestHandler;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportService;
@ -40,8 +41,9 @@ public class BenchmarkNettyServer {
.put("transport.netty.port", 9999)
.build();
final ThreadPool threadPool = new CachedThreadPool();
final TransportService transportService = new TransportService(new NettyTransport(settings, threadPool), threadPool).start();
final ThreadPool threadPool = new CachedThreadPool(settings);
final TimerService timerService = new TimerService(settings, threadPool);
final TransportService transportService = new TransportService(new NettyTransport(settings, threadPool), threadPool, timerService).start();
transportService.registerHandler("benchmark", new BaseTransportRequestHandler<BenchmarkMessage>() {
@Override public BenchmarkMessage newInstance() {

View File

@ -28,6 +28,7 @@ import org.elasticsearch.transport.BindTransportException;
import org.elasticsearch.transport.netty.NettyInternalESLoggerFactory;
import org.elasticsearch.util.SizeValue;
import org.elasticsearch.util.component.AbstractLifecycleComponent;
import org.elasticsearch.util.io.NetworkUtils;
import org.elasticsearch.util.settings.Settings;
import org.elasticsearch.util.transport.BoundTransportAddress;
import org.elasticsearch.util.transport.InetSocketTransportAddress;
@ -48,7 +49,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.util.concurrent.DynamicExecutors.*;
import static org.elasticsearch.util.io.HostResolver.*;
import static org.elasticsearch.util.io.NetworkUtils.*;
/**
* @author kimchy (shay.banon)
@ -101,7 +102,7 @@ public class NettyMemcachedServerTransport extends AbstractLifecycleComponent<Me
this.publishHost = componentSettings.get("publish_host");
this.tcpNoDelay = componentSettings.getAsBoolean("tcp_no_delay", true);
this.tcpKeepAlive = componentSettings.getAsBoolean("tcp_keep_alive", null);
this.reuseAddress = componentSettings.getAsBoolean("reuse_address", null);
this.reuseAddress = componentSettings.getAsBoolean("reuse_address", NetworkUtils.defaultReuseAddress());
this.tcpSendBufferSize = componentSettings.getAsSize("tcp_send_buffer_size", null);
this.tcpReceiveBufferSize = componentSettings.getAsSize("tcp_receive_buffer_size", null);
}
@ -176,17 +177,7 @@ public class NettyMemcachedServerTransport extends AbstractLifecycleComponent<Me
InetSocketAddress boundAddress = (InetSocketAddress) serverChannel.getLocalAddress();
InetSocketAddress publishAddress;
try {
InetAddress publishAddressX = resolvePublishHostAddress(publishHost, settings);
if (publishAddressX == null) {
// if its 0.0.0.0, we can't publish that.., default to the local ip address
if (boundAddress.getAddress().isAnyLocalAddress()) {
publishAddress = new InetSocketAddress(resolvePublishHostAddress(publishHost, settings, LOCAL_IP), boundAddress.getPort());
} else {
publishAddress = boundAddress;
}
} else {
publishAddress = new InetSocketAddress(publishAddressX, boundAddress.getPort());
}
publishAddress = new InetSocketAddress(resolvePublishHostAddress(publishHost, settings), boundAddress.getPort());
} catch (Exception e) {
throw new BindTransportException("Failed to resolve publish address", e);
}