allow to more easily plug custom unicast host providers by being able to add them to ZenDiscoveryModule using a plugin

This commit is contained in:
Shay Banon 2012-03-31 21:38:39 +03:00
parent 85ab25126f
commit 9fb6ecf9f0
5 changed files with 39 additions and 8 deletions

View File

@ -19,18 +19,36 @@
package org.elasticsearch.discovery.zen;
import com.google.common.collect.Lists;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.multibindings.Multibinder;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.zen.ping.ZenPingService;
import org.elasticsearch.discovery.zen.ping.unicast.UnicastHostsProvider;
import java.util.List;
/**
*
*/
public class ZenDiscoveryModule extends AbstractModule {
private final List<Class<? extends UnicastHostsProvider>> unicastHostProviders = Lists.newArrayList();
/**
* Adds a custom unicast hosts provider to build a dynamic list of unicast hosts list when doing unicast discovery.
*/
public ZenDiscoveryModule addUnicastHostProvider(Class<? extends UnicastHostsProvider> unicastHostProvider) {
unicastHostProviders.add(unicastHostProvider);
return this;
}
@Override
protected void configure() {
bind(ZenPingService.class).asEagerSingleton();
Multibinder<UnicastHostsProvider> unicastHostsProviderMultibinder = Multibinder.newSetBinder(binder(), UnicastHostsProvider.class);
for (Class<? extends UnicastHostsProvider> unicastHostProvider : unicastHostProviders) {
unicastHostsProviderMultibinder.addBinding().to(unicastHostProvider);
}
bindDiscovery();
}

View File

@ -24,6 +24,7 @@ import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.network.NetworkService;
@ -31,10 +32,12 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
import org.elasticsearch.discovery.zen.ping.multicast.MulticastZenPing;
import org.elasticsearch.discovery.zen.ping.unicast.UnicastHostsProvider;
import org.elasticsearch.discovery.zen.ping.unicast.UnicastZenPing;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
@ -49,7 +52,8 @@ public class ZenPingService extends AbstractLifecycleComponent<ZenPing> implemen
private volatile ImmutableList<? extends ZenPing> zenPings = ImmutableList.of();
@Inject
public ZenPingService(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName, NetworkService networkService) {
public ZenPingService(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName, NetworkService networkService,
@Nullable Set<UnicastHostsProvider> unicastHostsProviders) {
super(settings);
ImmutableList.Builder<ZenPing> zenPingsBuilder = ImmutableList.builder();
@ -57,7 +61,7 @@ public class ZenPingService extends AbstractLifecycleComponent<ZenPing> implemen
zenPingsBuilder.add(new MulticastZenPing(settings, threadPool, transportService, clusterName, networkService));
}
// always add the unicast hosts, so it will be able to receive unicast requests even when working in multicast
zenPingsBuilder.add(new UnicastZenPing(settings, threadPool, transportService, clusterName));
zenPingsBuilder.add(new UnicastZenPing(settings, threadPool, transportService, clusterName, unicastHostsProviders));
this.zenPings = zenPingsBuilder.build();
}

View File

@ -24,9 +24,12 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import java.util.List;
/**
*
* A pluggable provider of the list of unicast hosts to use for unicast discovery.
*/
public interface UnicastHostsProvider {
/**
* Builds the dynamic list of unicast hosts to be used for unicast discovery.
*/
List<DiscoveryNode> buildDynamicNodes();
}

View File

@ -83,15 +83,21 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
private final CopyOnWriteArrayList<UnicastHostsProvider> hostsProviders = new CopyOnWriteArrayList<UnicastHostsProvider>();
public UnicastZenPing(ThreadPool threadPool, TransportService transportService, ClusterName clusterName) {
this(EMPTY_SETTINGS, threadPool, transportService, clusterName);
this(EMPTY_SETTINGS, threadPool, transportService, clusterName, null);
}
public UnicastZenPing(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName) {
public UnicastZenPing(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName, @Nullable Set<UnicastHostsProvider> unicastHostsProviders) {
super(settings);
this.threadPool = threadPool;
this.transportService = transportService;
this.clusterName = clusterName;
if (unicastHostsProviders != null) {
for (UnicastHostsProvider unicastHostsProvider : unicastHostsProviders) {
addHostsProvider(unicastHostsProvider);
}
}
this.concurrentConnects = componentSettings.getAsInt("concurrent_connects", 10);
String[] hostArr = componentSettings.getAsArray("hosts");
// trim the hosts

View File

@ -64,7 +64,7 @@ public class UnicastZenPingTests {
addressB.address().getAddress().getHostAddress() + ":" + addressB.address().getPort())
.build();
UnicastZenPing zenPingA = new UnicastZenPing(hostsSettings, threadPool, transportServiceA, clusterName);
UnicastZenPing zenPingA = new UnicastZenPing(hostsSettings, threadPool, transportServiceA, clusterName, null);
zenPingA.setNodesProvider(new DiscoveryNodesProvider() {
@Override
public DiscoveryNodes nodes() {
@ -78,7 +78,7 @@ public class UnicastZenPingTests {
});
zenPingA.start();
UnicastZenPing zenPingB = new UnicastZenPing(hostsSettings, threadPool, transportServiceB, clusterName);
UnicastZenPing zenPingB = new UnicastZenPing(hostsSettings, threadPool, transportServiceB, clusterName, null);
zenPingB.setNodesProvider(new DiscoveryNodesProvider() {
@Override
public DiscoveryNodes nodes() {