make resolving of "logical" (#...#) hosts pluggable (so we can later support something like #cloud:privateip#

This commit is contained in:
kimchy 2010-05-03 00:50:44 +03:00
parent 0fcc9a43c5
commit 2d20ba0b42
12 changed files with 226 additions and 102 deletions

View File

@ -59,6 +59,7 @@ import org.elasticsearch.util.gcommon.collect.ImmutableList;
import org.elasticsearch.util.guice.inject.Guice; import org.elasticsearch.util.guice.inject.Guice;
import org.elasticsearch.util.guice.inject.Injector; import org.elasticsearch.util.guice.inject.Injector;
import org.elasticsearch.util.guice.inject.Module; import org.elasticsearch.util.guice.inject.Module;
import org.elasticsearch.util.network.NetworkModule;
import org.elasticsearch.util.settings.ImmutableSettings; import org.elasticsearch.util.settings.ImmutableSettings;
import org.elasticsearch.util.settings.Settings; import org.elasticsearch.util.settings.Settings;
import org.elasticsearch.util.settings.SettingsModule; import org.elasticsearch.util.settings.SettingsModule;
@ -127,6 +128,7 @@ public class TransportClient implements InternalClient {
ArrayList<Module> modules = new ArrayList<Module>(); ArrayList<Module> modules = new ArrayList<Module>();
modules.add(new EnvironmentModule(environment)); modules.add(new EnvironmentModule(environment));
modules.add(new SettingsModule(settings)); modules.add(new SettingsModule(settings));
modules.add(new NetworkModule());
modules.add(new ClusterNameModule(settings)); modules.add(new ClusterNameModule(settings));
modules.add(new TimerModule()); modules.add(new TimerModule());
modules.add(new ThreadPoolModule(settings)); modules.add(new ThreadPoolModule(settings));

View File

@ -19,8 +19,6 @@
package org.elasticsearch.discovery.zen.ping; package org.elasticsearch.discovery.zen.ping;
import org.elasticsearch.util.gcommon.collect.ImmutableList;
import org.elasticsearch.util.guice.inject.Inject;
import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalStateException; import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterName;
@ -32,6 +30,9 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import org.elasticsearch.util.TimeValue; import org.elasticsearch.util.TimeValue;
import org.elasticsearch.util.component.AbstractLifecycleComponent; import org.elasticsearch.util.component.AbstractLifecycleComponent;
import org.elasticsearch.util.gcommon.collect.ImmutableList;
import org.elasticsearch.util.guice.inject.Inject;
import org.elasticsearch.util.network.NetworkService;
import org.elasticsearch.util.settings.Settings; import org.elasticsearch.util.settings.Settings;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@ -47,12 +48,12 @@ public class ZenPingService extends AbstractLifecycleComponent<ZenPing> implemen
private volatile ImmutableList<? extends ZenPing> zenPings = ImmutableList.of(); private volatile ImmutableList<? extends ZenPing> zenPings = ImmutableList.of();
@Inject public ZenPingService(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName) { @Inject public ZenPingService(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName, NetworkService networkService) {
super(settings); super(settings);
ImmutableList.Builder<ZenPing> zenPingsBuilder = ImmutableList.builder(); ImmutableList.Builder<ZenPing> zenPingsBuilder = ImmutableList.builder();
if (componentSettings.getAsBoolean("multicast.enabled", true)) { if (componentSettings.getAsBoolean("multicast.enabled", true)) {
zenPingsBuilder.add(new MulticastZenPing(settings, threadPool, transportService, clusterName)); zenPingsBuilder.add(new MulticastZenPing(settings, threadPool, transportService, clusterName, networkService));
} }
if (componentSettings.get("unicast.hosts") != null || componentSettings.getAsArray("unicast.hosts").length > 0) { if (componentSettings.get("unicast.hosts") != null || componentSettings.getAsArray("unicast.hosts").length > 0) {
zenPingsBuilder.add(new UnicastZenPing(settings, threadPool, transportService, clusterName)); zenPingsBuilder.add(new UnicastZenPing(settings, threadPool, transportService, clusterName));

View File

@ -33,7 +33,7 @@ import org.elasticsearch.transport.*;
import org.elasticsearch.util.TimeValue; import org.elasticsearch.util.TimeValue;
import org.elasticsearch.util.component.AbstractLifecycleComponent; import org.elasticsearch.util.component.AbstractLifecycleComponent;
import org.elasticsearch.util.io.stream.*; import org.elasticsearch.util.io.stream.*;
import org.elasticsearch.util.settings.ImmutableSettings; import org.elasticsearch.util.network.NetworkService;
import org.elasticsearch.util.settings.Settings; import org.elasticsearch.util.settings.Settings;
import java.io.IOException; import java.io.IOException;
@ -49,7 +49,7 @@ import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.cluster.node.DiscoveryNode.*; import static org.elasticsearch.cluster.node.DiscoveryNode.*;
import static org.elasticsearch.util.concurrent.ConcurrentMaps.*; import static org.elasticsearch.util.concurrent.ConcurrentMaps.*;
import static org.elasticsearch.util.concurrent.DynamicExecutors.*; import static org.elasticsearch.util.concurrent.DynamicExecutors.*;
import static org.elasticsearch.util.io.NetworkUtils.*; import static org.elasticsearch.util.settings.ImmutableSettings.Builder.*;
/** /**
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
@ -72,6 +72,8 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
private final ClusterName clusterName; private final ClusterName clusterName;
private final NetworkService networkService;
private volatile DiscoveryNodesProvider nodesProvider; private volatile DiscoveryNodesProvider nodesProvider;
@ -94,14 +96,15 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
private final Object receiveMutex = new Object(); private final Object receiveMutex = new Object();
public MulticastZenPing(ThreadPool threadPool, TransportService transportService, ClusterName clusterName) { public MulticastZenPing(ThreadPool threadPool, TransportService transportService, ClusterName clusterName) {
this(ImmutableSettings.Builder.EMPTY_SETTINGS, threadPool, transportService, clusterName); this(EMPTY_SETTINGS, threadPool, transportService, clusterName, new NetworkService(EMPTY_SETTINGS));
} }
public MulticastZenPing(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName) { public MulticastZenPing(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName, NetworkService networkService) {
super(settings); super(settings);
this.threadPool = threadPool; this.threadPool = threadPool;
this.transportService = transportService; this.transportService = transportService;
this.clusterName = clusterName; this.clusterName = clusterName;
this.networkService = networkService;
this.address = componentSettings.get("address"); this.address = componentSettings.get("address");
this.port = componentSettings.getAsInt("port", 54328); this.port = componentSettings.getAsInt("port", 54328);
@ -134,7 +137,7 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
multicastSocket.bind(new InetSocketAddress(port)); multicastSocket.bind(new InetSocketAddress(port));
multicastSocket.setTimeToLive(ttl); multicastSocket.setTimeToLive(ttl);
// set the send interface // set the send interface
InetAddress multicastInterface = resolvePublishHostAddress(address, settings); InetAddress multicastInterface = networkService.resolvePublishHostAddress(address);
multicastSocket.setInterface(multicastInterface); multicastSocket.setInterface(multicastInterface);
multicastSocket.setReceiveBufferSize(bufferSize); multicastSocket.setReceiveBufferSize(bufferSize);
multicastSocket.setSendBufferSize(bufferSize); multicastSocket.setSendBufferSize(bufferSize);

View File

@ -19,7 +19,6 @@
package org.elasticsearch.http.netty; package org.elasticsearch.http.netty;
import org.elasticsearch.util.guice.inject.Inject;
import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.http.*; import org.elasticsearch.http.*;
import org.elasticsearch.transport.BindTransportException; import org.elasticsearch.transport.BindTransportException;
@ -27,7 +26,9 @@ import org.elasticsearch.transport.netty.NettyInternalESLoggerFactory;
import org.elasticsearch.util.SizeUnit; import org.elasticsearch.util.SizeUnit;
import org.elasticsearch.util.SizeValue; import org.elasticsearch.util.SizeValue;
import org.elasticsearch.util.component.AbstractLifecycleComponent; import org.elasticsearch.util.component.AbstractLifecycleComponent;
import org.elasticsearch.util.io.NetworkUtils; import org.elasticsearch.util.guice.inject.Inject;
import org.elasticsearch.util.network.NetworkService;
import org.elasticsearch.util.network.NetworkUtils;
import org.elasticsearch.util.settings.Settings; import org.elasticsearch.util.settings.Settings;
import org.elasticsearch.util.transport.BoundTransportAddress; import org.elasticsearch.util.transport.BoundTransportAddress;
import org.elasticsearch.util.transport.InetSocketTransportAddress; import org.elasticsearch.util.transport.InetSocketTransportAddress;
@ -50,7 +51,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.util.concurrent.DynamicExecutors.*; import static org.elasticsearch.util.concurrent.DynamicExecutors.*;
import static org.elasticsearch.util.io.NetworkUtils.*;
/** /**
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
@ -65,6 +65,8 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent<HttpSer
}); });
} }
private final NetworkService networkService;
private final SizeValue maxContentLength; private final SizeValue maxContentLength;
private final int workerCount; private final int workerCount;
@ -95,8 +97,9 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent<HttpSer
private volatile HttpServerAdapter httpServerAdapter; private volatile HttpServerAdapter httpServerAdapter;
@Inject public NettyHttpServerTransport(Settings settings) { @Inject public NettyHttpServerTransport(Settings settings, NetworkService networkService) {
super(settings); super(settings);
this.networkService = networkService;
SizeValue maxContentLength = componentSettings.getAsSize("max_content_length", new SizeValue(100, SizeUnit.MB)); SizeValue maxContentLength = componentSettings.getAsSize("max_content_length", new SizeValue(100, SizeUnit.MB));
this.workerCount = componentSettings.getAsInt("worker_count", Runtime.getRuntime().availableProcessors()); this.workerCount = componentSettings.getAsInt("worker_count", Runtime.getRuntime().availableProcessors());
this.port = componentSettings.get("port", "9200-9300"); this.port = componentSettings.get("port", "9200-9300");
@ -164,7 +167,7 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent<HttpSer
// Bind and start to accept incoming connections. // Bind and start to accept incoming connections.
InetAddress hostAddressX; InetAddress hostAddressX;
try { try {
hostAddressX = resolveBindHostAddress(bindHost, settings); hostAddressX = networkService.resolveBindHostAddress(bindHost);
} catch (IOException e) { } catch (IOException e) {
throw new BindHttpException("Failed to resolve host [" + bindHost + "]", e); throw new BindHttpException("Failed to resolve host [" + bindHost + "]", e);
} }
@ -190,7 +193,7 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent<HttpSer
InetSocketAddress boundAddress = (InetSocketAddress) serverChannel.getLocalAddress(); InetSocketAddress boundAddress = (InetSocketAddress) serverChannel.getLocalAddress();
InetSocketAddress publishAddress; InetSocketAddress publishAddress;
try { try {
publishAddress = new InetSocketAddress(resolvePublishHostAddress(publishHost, settings), boundAddress.getPort()); publishAddress = new InetSocketAddress(networkService.resolvePublishHostAddress(publishHost), boundAddress.getPort());
} catch (Exception e) { } catch (Exception e) {
throw new BindTransportException("Failed to resolve publish address", e); throw new BindTransportException("Failed to resolve publish address", e);
} }

View File

@ -19,8 +19,8 @@
package org.elasticsearch.jmx; package org.elasticsearch.jmx;
import org.elasticsearch.util.io.NetworkUtils;
import org.elasticsearch.util.logging.ESLogger; import org.elasticsearch.util.logging.ESLogger;
import org.elasticsearch.util.network.NetworkService;
import org.elasticsearch.util.settings.Settings; import org.elasticsearch.util.settings.Settings;
import org.elasticsearch.util.transport.PortsRange; import org.elasticsearch.util.transport.PortsRange;
@ -91,7 +91,7 @@ public class JmxService {
return this.publishUrl; return this.publishUrl;
} }
public void connectAndRegister(String nodeDescription) { public void connectAndRegister(String nodeDescription, final NetworkService networkService) {
if (started) { if (started) {
return; return;
} }
@ -114,7 +114,7 @@ public class JmxService {
connectorServer.start(); connectorServer.start();
// create the publish url // create the publish url
String publishHost = NetworkUtils.resolvePublishHostAddress(settings.get("jmx.publishHost"), settings).getHostAddress(); String publishHost = networkService.resolvePublishHostAddress(settings.get("jmx.publishHost")).getHostAddress();
publishUrl = settings.get("jmx.publishUrl", JMXRMI_PUBLISH_URI_PATTERN).replace("{jmx.port}", Integer.toString(portNumber)).replace("{jmx.host}", publishHost); publishUrl = settings.get("jmx.publishUrl", JMXRMI_PUBLISH_URI_PATTERN).replace("{jmx.port}", Integer.toString(portNumber)).replace("{jmx.host}", publishHost);
} catch (Exception e) { } catch (Exception e) {
lastException.set(e); lastException.set(e);

View File

@ -19,9 +19,6 @@
package org.elasticsearch.node.internal; package org.elasticsearch.node.internal;
import org.elasticsearch.util.guice.inject.Guice;
import org.elasticsearch.util.guice.inject.Injector;
import org.elasticsearch.util.guice.inject.Module;
import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.action.TransportActionModule; import org.elasticsearch.action.TransportActionModule;
@ -65,9 +62,14 @@ import org.elasticsearch.util.Tuple;
import org.elasticsearch.util.component.Lifecycle; import org.elasticsearch.util.component.Lifecycle;
import org.elasticsearch.util.component.LifecycleComponent; import org.elasticsearch.util.component.LifecycleComponent;
import org.elasticsearch.util.guice.Injectors; import org.elasticsearch.util.guice.Injectors;
import org.elasticsearch.util.guice.inject.Guice;
import org.elasticsearch.util.guice.inject.Injector;
import org.elasticsearch.util.guice.inject.Module;
import org.elasticsearch.util.io.FileSystemUtils; import org.elasticsearch.util.io.FileSystemUtils;
import org.elasticsearch.util.logging.ESLogger; import org.elasticsearch.util.logging.ESLogger;
import org.elasticsearch.util.logging.Loggers; import org.elasticsearch.util.logging.Loggers;
import org.elasticsearch.util.network.NetworkModule;
import org.elasticsearch.util.network.NetworkService;
import org.elasticsearch.util.settings.Settings; import org.elasticsearch.util.settings.Settings;
import org.elasticsearch.util.settings.SettingsModule; import org.elasticsearch.util.settings.SettingsModule;
@ -111,6 +113,7 @@ public final class InternalNode implements Node {
ArrayList<Module> modules = new ArrayList<Module>(); ArrayList<Module> modules = new ArrayList<Module>();
modules.add(new PluginsModule(settings, pluginsService)); modules.add(new PluginsModule(settings, pluginsService));
modules.add(new NodeModule(this)); modules.add(new NodeModule(this));
modules.add(new NetworkModule());
modules.add(new JmxModule(settings)); modules.add(new JmxModule(settings));
modules.add(new EnvironmentModule(environment)); modules.add(new EnvironmentModule(environment));
modules.add(new ClusterNameModule(settings)); modules.add(new ClusterNameModule(settings));
@ -175,7 +178,7 @@ public final class InternalNode implements Node {
if (settings.getAsBoolean("http.enabled", true)) { if (settings.getAsBoolean("http.enabled", true)) {
injector.getInstance(HttpServer.class).start(); injector.getInstance(HttpServer.class).start();
} }
injector.getInstance(JmxService.class).connectAndRegister(discoService.nodeDescription()); injector.getInstance(JmxService.class).connectAndRegister(discoService.nodeDescription(), injector.getInstance(NetworkService.class));
logger.info("{{}}[{}]: Started", Version.full(), JvmConfig.jvmConfig().pid()); logger.info("{{}}[{}]: Started", Version.full(), JvmConfig.jvmConfig().pid());

View File

@ -19,21 +19,22 @@
package org.elasticsearch.transport.netty; package org.elasticsearch.transport.netty;
import org.elasticsearch.util.Strings;
import org.elasticsearch.util.gcommon.collect.Lists;
import org.elasticsearch.util.guice.inject.Inject;
import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalStateException; import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*; import org.elasticsearch.transport.*;
import org.elasticsearch.util.SizeValue; import org.elasticsearch.util.SizeValue;
import org.elasticsearch.util.Strings;
import org.elasticsearch.util.TimeValue; import org.elasticsearch.util.TimeValue;
import org.elasticsearch.util.component.AbstractLifecycleComponent; import org.elasticsearch.util.component.AbstractLifecycleComponent;
import org.elasticsearch.util.io.NetworkUtils; import org.elasticsearch.util.gcommon.collect.Lists;
import org.elasticsearch.util.guice.inject.Inject;
import org.elasticsearch.util.io.stream.BytesStreamOutput; import org.elasticsearch.util.io.stream.BytesStreamOutput;
import org.elasticsearch.util.io.stream.HandlesStreamOutput; import org.elasticsearch.util.io.stream.HandlesStreamOutput;
import org.elasticsearch.util.io.stream.Streamable; import org.elasticsearch.util.io.stream.Streamable;
import org.elasticsearch.util.network.NetworkService;
import org.elasticsearch.util.network.NetworkUtils;
import org.elasticsearch.util.settings.Settings; import org.elasticsearch.util.settings.Settings;
import org.elasticsearch.util.transport.BoundTransportAddress; import org.elasticsearch.util.transport.BoundTransportAddress;
import org.elasticsearch.util.transport.InetSocketTransportAddress; import org.elasticsearch.util.transport.InetSocketTransportAddress;
@ -63,12 +64,11 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.util.gcommon.collect.Lists.*;
import static org.elasticsearch.transport.Transport.Helper.*; import static org.elasticsearch.transport.Transport.Helper.*;
import static org.elasticsearch.util.TimeValue.*; import static org.elasticsearch.util.TimeValue.*;
import static org.elasticsearch.util.concurrent.ConcurrentMaps.*; import static org.elasticsearch.util.concurrent.ConcurrentMaps.*;
import static org.elasticsearch.util.concurrent.DynamicExecutors.*; import static org.elasticsearch.util.concurrent.DynamicExecutors.*;
import static org.elasticsearch.util.io.NetworkUtils.*; import static org.elasticsearch.util.gcommon.collect.Lists.*;
import static org.elasticsearch.util.settings.ImmutableSettings.Builder.*; import static org.elasticsearch.util.settings.ImmutableSettings.Builder.*;
import static org.elasticsearch.util.transport.NetworkExceptionHelper.*; import static org.elasticsearch.util.transport.NetworkExceptionHelper.*;
@ -85,6 +85,8 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
}); });
} }
private final NetworkService networkService;
final int workerCount; final int workerCount;
final String port; final String port;
@ -126,12 +128,17 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
private volatile BoundTransportAddress boundAddress; private volatile BoundTransportAddress boundAddress;
public NettyTransport(ThreadPool threadPool) { public NettyTransport(ThreadPool threadPool) {
this(EMPTY_SETTINGS, threadPool); this(EMPTY_SETTINGS, threadPool, new NetworkService(EMPTY_SETTINGS));
} }
@Inject public NettyTransport(Settings settings, ThreadPool threadPool) { public NettyTransport(Settings settings, ThreadPool threadPool) {
this(settings, threadPool, new NetworkService(settings));
}
@Inject public NettyTransport(Settings settings, ThreadPool threadPool, NetworkService networkService) {
super(settings); super(settings);
this.threadPool = threadPool; this.threadPool = threadPool;
this.networkService = networkService;
this.workerCount = componentSettings.getAsInt("worker_count", Runtime.getRuntime().availableProcessors()); this.workerCount = componentSettings.getAsInt("worker_count", Runtime.getRuntime().availableProcessors());
this.port = componentSettings.get("port", "9300-9400"); this.port = componentSettings.get("port", "9300-9400");
@ -232,7 +239,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
// Bind and start to accept incoming connections. // Bind and start to accept incoming connections.
InetAddress hostAddressX; InetAddress hostAddressX;
try { try {
hostAddressX = resolveBindHostAddress(bindHost, settings); hostAddressX = networkService.resolveBindHostAddress(bindHost);
} catch (IOException e) { } catch (IOException e) {
throw new BindTransportException("Failed to resolve host [" + bindHost + "]", e); throw new BindTransportException("Failed to resolve host [" + bindHost + "]", e);
} }
@ -260,7 +267,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
InetSocketAddress boundAddress = (InetSocketAddress) serverChannel.getLocalAddress(); InetSocketAddress boundAddress = (InetSocketAddress) serverChannel.getLocalAddress();
InetSocketAddress publishAddress; InetSocketAddress publishAddress;
try { try {
publishAddress = new InetSocketAddress(resolvePublishHostAddress(publishHost, settings), boundAddress.getPort()); publishAddress = new InetSocketAddress(networkService.resolvePublishHostAddress(publishHost), boundAddress.getPort());
} catch (Exception e) { } catch (Exception e) {
throw new BindTransportException("Failed to resolve publish address", e); throw new BindTransportException("Failed to resolve publish address", e);
} }
@ -337,7 +344,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
} }
String host = address.substring(0, index); String host = address.substring(0, index);
int port = Integer.parseInt(address.substring(index + 1)); int port = Integer.parseInt(address.substring(index + 1));
return new TransportAddress[] {new InetSocketTransportAddress(host, port)}; return new TransportAddress[]{new InetSocketTransportAddress(host, port)};
} }
} }

View File

@ -0,0 +1,32 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.util.network;
import org.elasticsearch.util.guice.inject.AbstractModule;
/**
* @author kimchy (shay.banon)
*/
public class NetworkModule extends AbstractModule {
@Override protected void configure() {
bind(NetworkService.class).asEagerSingleton();
}
}

View File

@ -0,0 +1,118 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.util.network;
import org.elasticsearch.util.MapBuilder;
import org.elasticsearch.util.component.AbstractComponent;
import org.elasticsearch.util.gcommon.collect.ImmutableMap;
import org.elasticsearch.util.guice.inject.Inject;
import org.elasticsearch.util.settings.Settings;
import java.io.IOException;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.UnknownHostException;
import java.util.Collection;
/**
* @author kimchy (shay.banon)
*/
public class NetworkService extends AbstractComponent {
public static final String LOCAL = "#local#";
public static final String GLOBAL_NETWORK_BINDHOST_SETTING = "network.bind_host";
public static final String GLOBAL_NETWORK_PUBLISHHOST_SETTING = "network.publish_host";
public static interface CustomNameResolver {
InetAddress resolve();
}
private volatile ImmutableMap<String, CustomNameResolver> customNameResolvers = ImmutableMap.of();
@Inject public NetworkService(Settings settings) {
super(settings);
}
public void addCustomNameResolver(String name, CustomNameResolver customNameResolver) {
if (!(name.startsWith("#") && name.endsWith("#"))) {
name = "#" + name + "#";
}
customNameResolvers = MapBuilder.<String, CustomNameResolver>newMapBuilder().putAll(customNameResolvers).put(name, customNameResolver).immutableMap();
}
public InetAddress resolveBindHostAddress(String bindHost) throws IOException {
return resolveBindHostAddress(bindHost, null);
}
public InetAddress resolveBindHostAddress(String bindHost, String defaultValue2) throws IOException {
return resolveInetAddress(bindHost, settings.get(GLOBAL_NETWORK_BINDHOST_SETTING), defaultValue2);
}
public InetAddress resolvePublishHostAddress(String publishHost) throws IOException {
InetAddress address = resolvePublishHostAddress(publishHost, null);
// verify that its not a local address
if (address == null || address.isAnyLocalAddress()) {
address = NetworkUtils.getLocalAddress();
}
return address;
}
public InetAddress resolvePublishHostAddress(String publishHost, String defaultValue2) throws IOException {
return resolveInetAddress(publishHost, settings.get(GLOBAL_NETWORK_PUBLISHHOST_SETTING), defaultValue2);
}
public InetAddress resolveInetAddress(String host, String defaultValue1, String defaultValue2) throws UnknownHostException, IOException {
if (host == null) {
host = defaultValue1;
}
if (host == null) {
host = defaultValue2;
}
if (host == null) {
return null;
}
if (host.startsWith("#") && host.endsWith("#")) {
host = host.substring(1, host.length() - 1);
CustomNameResolver customNameResolver = customNameResolvers.get(host);
if (customNameResolver != null) {
return customNameResolver.resolve();
}
if (host.equals("local")) {
return NetworkUtils.getLocalAddress();
} else {
Collection<NetworkInterface> allInterfs = NetworkUtils.getAllAvailableInterfaces();
for (NetworkInterface ni : allInterfs) {
if (!ni.isUp() || ni.isLoopback()) {
continue;
}
if (host.equals(ni.getName()) || host.equals(ni.getDisplayName())) {
return NetworkUtils.getFirstNonLoopbackAddress(ni, NetworkUtils.getIpStackType());
}
}
}
throw new IOException("Failed to find network interface for [" + host + "]");
}
return InetAddress.getByName(host);
}
}

View File

@ -17,14 +17,12 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.util.io; package org.elasticsearch.util.network;
import org.elasticsearch.util.OsUtils; import org.elasticsearch.util.OsUtils;
import org.elasticsearch.util.logging.ESLogger; import org.elasticsearch.util.logging.ESLogger;
import org.elasticsearch.util.logging.Loggers; import org.elasticsearch.util.logging.Loggers;
import org.elasticsearch.util.settings.Settings;
import java.io.IOException;
import java.net.*; import java.net.*;
import java.util.*; import java.util.*;
@ -43,10 +41,6 @@ public abstract class NetworkUtils {
public static final String IPv6_SETTING = "java.net.preferIPv6Addresses"; public static final String IPv6_SETTING = "java.net.preferIPv6Addresses";
public static final String NON_LOOPBACK_ADDRESS = "non_loopback_address"; public static final String NON_LOOPBACK_ADDRESS = "non_loopback_address";
public static final String LOCAL = "#local#";
public static final String GLOBAL_NETWORK_BINDHOST_SETTING = "network.bind_host";
public static final String GLOBAL_NETWORK_PUBLISHHOST_SETTING = "network.publish_host";
private final static InetAddress localAddress; private final static InetAddress localAddress;
@ -68,56 +62,6 @@ public abstract class NetworkUtils {
return System.getProperty("java.net.preferIPv4Stack") != null && System.getProperty("java.net.preferIPv4Stack").equals("true"); return System.getProperty("java.net.preferIPv4Stack") != null && System.getProperty("java.net.preferIPv4Stack").equals("true");
} }
public static InetAddress resolveBindHostAddress(String bindHost, Settings settings) throws IOException {
return resolveBindHostAddress(bindHost, settings, null);
}
public static InetAddress resolveBindHostAddress(String bindHost, Settings settings, String defaultValue2) throws IOException {
return resolveInetAddress(bindHost, settings.get(GLOBAL_NETWORK_BINDHOST_SETTING), defaultValue2);
}
public static InetAddress resolvePublishHostAddress(String publishHost, Settings settings) throws IOException {
InetAddress address = resolvePublishHostAddress(publishHost, settings, null);
// verify that its not a local address
if (address == null || address.isAnyLocalAddress()) {
address = localAddress;
}
return address;
}
public static InetAddress resolvePublishHostAddress(String publishHost, Settings settings, String defaultValue2) throws IOException {
return resolveInetAddress(publishHost, settings.get(GLOBAL_NETWORK_PUBLISHHOST_SETTING), defaultValue2);
}
public static InetAddress resolveInetAddress(String host, String defaultValue1, String defaultValue2) throws UnknownHostException, IOException {
if (host == null) {
host = defaultValue1;
}
if (host == null) {
host = defaultValue2;
}
if (host == null) {
return null;
}
if (host.startsWith("#") && host.endsWith("#")) {
host = host.substring(1, host.length() - 1);
if (host.equals("local")) {
return localAddress;
} else {
Collection<NetworkInterface> allInterfs = getAllAvailableInterfaces();
for (NetworkInterface ni : allInterfs) {
if (!ni.isUp() || ni.isLoopback()) {
continue;
}
if (host.equals(ni.getName()) || host.equals(ni.getDisplayName())) {
return getFirstNonLoopbackAddress(ni, getIpStackType());
}
}
}
throw new IOException("Failed to find network interface for [" + host + "]");
}
return InetAddress.getByName(host);
}
public static InetAddress getIPv4Localhost() throws UnknownHostException { public static InetAddress getIPv4Localhost() throws UnknownHostException {
return getLocalhost(StackType.IPv4); return getLocalhost(StackType.IPv4);
@ -127,6 +71,10 @@ public abstract class NetworkUtils {
return getLocalhost(StackType.IPv6); return getLocalhost(StackType.IPv6);
} }
public static InetAddress getLocalAddress() {
return localAddress;
}
public static InetAddress getLocalhost(StackType ip_version) throws UnknownHostException { public static InetAddress getLocalhost(StackType ip_version) throws UnknownHostException {
if (ip_version == StackType.IPv4) if (ip_version == StackType.IPv4)
return InetAddress.getByName("127.0.0.1"); return InetAddress.getByName("127.0.0.1");

View File

@ -19,7 +19,6 @@
package org.elasticsearch.discovery.jgroups; package org.elasticsearch.discovery.jgroups;
import org.elasticsearch.util.guice.inject.Inject;
import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalStateException; import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.cluster.*; import org.elasticsearch.cluster.*;
@ -31,9 +30,11 @@ import org.elasticsearch.discovery.InitialStateDiscoveryListener;
import org.elasticsearch.env.Environment; import org.elasticsearch.env.Environment;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import org.elasticsearch.util.component.AbstractLifecycleComponent; import org.elasticsearch.util.component.AbstractLifecycleComponent;
import org.elasticsearch.util.io.NetworkUtils; import org.elasticsearch.util.guice.inject.Inject;
import org.elasticsearch.util.io.stream.BytesStreamInput; import org.elasticsearch.util.io.stream.BytesStreamInput;
import org.elasticsearch.util.io.stream.BytesStreamOutput; import org.elasticsearch.util.io.stream.BytesStreamOutput;
import org.elasticsearch.util.network.NetworkService;
import org.elasticsearch.util.network.NetworkUtils;
import org.elasticsearch.util.settings.Settings; import org.elasticsearch.util.settings.Settings;
import org.jgroups.*; import org.jgroups.*;
@ -47,10 +48,10 @@ import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import static org.elasticsearch.util.gcommon.collect.Maps.*;
import static org.elasticsearch.util.gcommon.collect.Sets.*;
import static org.elasticsearch.cluster.ClusterState.*; import static org.elasticsearch.cluster.ClusterState.*;
import static org.elasticsearch.cluster.node.DiscoveryNode.*; import static org.elasticsearch.cluster.node.DiscoveryNode.*;
import static org.elasticsearch.util.gcommon.collect.Maps.*;
import static org.elasticsearch.util.gcommon.collect.Sets.*;
/** /**
* @author kimchy (Shay Banon) * @author kimchy (Shay Banon)
@ -67,6 +68,8 @@ public class JgroupsDiscovery extends AbstractLifecycleComponent<Discovery> impl
private final ClusterService clusterService; private final ClusterService clusterService;
private final NetworkService networkService;
private final Channel channel; private final Channel channel;
private volatile boolean addressSet = false; private volatile boolean addressSet = false;
@ -80,11 +83,12 @@ public class JgroupsDiscovery extends AbstractLifecycleComponent<Discovery> impl
private final CopyOnWriteArrayList<InitialStateDiscoveryListener> initialStateListeners = new CopyOnWriteArrayList<InitialStateDiscoveryListener>(); private final CopyOnWriteArrayList<InitialStateDiscoveryListener> initialStateListeners = new CopyOnWriteArrayList<InitialStateDiscoveryListener>();
@Inject public JgroupsDiscovery(Settings settings, Environment environment, ClusterName clusterName, @Inject public JgroupsDiscovery(Settings settings, Environment environment, ClusterName clusterName,
TransportService transportService, ClusterService clusterService) { TransportService transportService, ClusterService clusterService, NetworkService networkService) {
super(settings); super(settings);
this.clusterName = clusterName; this.clusterName = clusterName;
this.transportService = transportService; this.transportService = transportService;
this.clusterService = clusterService; this.clusterService = clusterService;
this.networkService = networkService;
String config = componentSettings.get("config", "udp"); String config = componentSettings.get("config", "udp");
String actualConfig = config; String actualConfig = config;
@ -110,7 +114,7 @@ public class JgroupsDiscovery extends AbstractLifecycleComponent<Discovery> impl
if (System.getProperty("jgroups.bind_addr") == null) { if (System.getProperty("jgroups.bind_addr") == null) {
// automatically set the bind address based on ElasticSearch default bindings... // automatically set the bind address based on ElasticSearch default bindings...
try { try {
InetAddress bindAddress = NetworkUtils.resolveBindHostAddress(null, settings, NetworkUtils.LOCAL); InetAddress bindAddress = networkService.resolveBindHostAddress(null, NetworkService.LOCAL);
if ((bindAddress instanceof Inet4Address && NetworkUtils.isIPv4()) || (bindAddress instanceof Inet6Address && !NetworkUtils.isIPv4())) { if ((bindAddress instanceof Inet4Address && NetworkUtils.isIPv4()) || (bindAddress instanceof Inet6Address && !NetworkUtils.isIPv4())) {
sysPropsSet.put("jgroups.bind_addr", bindAddress.getHostAddress()); sysPropsSet.put("jgroups.bind_addr", bindAddress.getHostAddress());
System.setProperty("jgroups.bind_addr", bindAddress.getHostAddress()); System.setProperty("jgroups.bind_addr", bindAddress.getHostAddress());

View File

@ -19,7 +19,6 @@
package org.elasticsearch.memcached.netty; package org.elasticsearch.memcached.netty;
import org.elasticsearch.util.guice.inject.Inject;
import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.http.BindHttpException; import org.elasticsearch.http.BindHttpException;
import org.elasticsearch.memcached.MemcachedServerTransport; import org.elasticsearch.memcached.MemcachedServerTransport;
@ -28,7 +27,9 @@ import org.elasticsearch.transport.BindTransportException;
import org.elasticsearch.transport.netty.NettyInternalESLoggerFactory; import org.elasticsearch.transport.netty.NettyInternalESLoggerFactory;
import org.elasticsearch.util.SizeValue; import org.elasticsearch.util.SizeValue;
import org.elasticsearch.util.component.AbstractLifecycleComponent; import org.elasticsearch.util.component.AbstractLifecycleComponent;
import org.elasticsearch.util.io.NetworkUtils; import org.elasticsearch.util.guice.inject.Inject;
import org.elasticsearch.util.network.NetworkService;
import org.elasticsearch.util.network.NetworkUtils;
import org.elasticsearch.util.settings.Settings; import org.elasticsearch.util.settings.Settings;
import org.elasticsearch.util.transport.BoundTransportAddress; import org.elasticsearch.util.transport.BoundTransportAddress;
import org.elasticsearch.util.transport.InetSocketTransportAddress; import org.elasticsearch.util.transport.InetSocketTransportAddress;
@ -49,7 +50,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.util.concurrent.DynamicExecutors.*; import static org.elasticsearch.util.concurrent.DynamicExecutors.*;
import static org.elasticsearch.util.io.NetworkUtils.*;
/** /**
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
@ -66,6 +66,8 @@ public class NettyMemcachedServerTransport extends AbstractLifecycleComponent<Me
private final RestController restController; private final RestController restController;
private final NetworkService networkService;
private final int workerCount; private final int workerCount;
private final String port; private final String port;
@ -92,9 +94,10 @@ public class NettyMemcachedServerTransport extends AbstractLifecycleComponent<Me
private volatile OpenChannelsHandler serverOpenChannels; private volatile OpenChannelsHandler serverOpenChannels;
@Inject public NettyMemcachedServerTransport(Settings settings, RestController restController) { @Inject public NettyMemcachedServerTransport(Settings settings, RestController restController, NetworkService networkService) {
super(settings); super(settings);
this.restController = restController; this.restController = restController;
this.networkService = networkService;
this.workerCount = componentSettings.getAsInt("worker_count", Runtime.getRuntime().availableProcessors()); this.workerCount = componentSettings.getAsInt("worker_count", Runtime.getRuntime().availableProcessors());
this.port = componentSettings.get("port", "11211-11311"); this.port = componentSettings.get("port", "11211-11311");
@ -151,7 +154,7 @@ public class NettyMemcachedServerTransport extends AbstractLifecycleComponent<Me
// Bind and start to accept incoming connections. // Bind and start to accept incoming connections.
InetAddress hostAddressX; InetAddress hostAddressX;
try { try {
hostAddressX = resolveBindHostAddress(bindHost, settings); hostAddressX = networkService.resolveBindHostAddress(bindHost);
} catch (IOException e) { } catch (IOException e) {
throw new BindHttpException("Failed to resolve host [" + bindHost + "]", e); throw new BindHttpException("Failed to resolve host [" + bindHost + "]", e);
} }
@ -177,7 +180,7 @@ public class NettyMemcachedServerTransport extends AbstractLifecycleComponent<Me
InetSocketAddress boundAddress = (InetSocketAddress) serverChannel.getLocalAddress(); InetSocketAddress boundAddress = (InetSocketAddress) serverChannel.getLocalAddress();
InetSocketAddress publishAddress; InetSocketAddress publishAddress;
try { try {
publishAddress = new InetSocketAddress(resolvePublishHostAddress(publishHost, settings), boundAddress.getPort()); publishAddress = new InetSocketAddress(networkService.resolvePublishHostAddress(publishHost), boundAddress.getPort());
} catch (Exception e) { } catch (Exception e) {
throw new BindTransportException("Failed to resolve publish address", e); throw new BindTransportException("Failed to resolve publish address", e);
} }