From 9fb6ecf9f046b76bc04d1e3da6cfdc00c17d64cf Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Sat, 31 Mar 2012 21:38:39 +0300 Subject: [PATCH] allow to more easily plug custom unicast host providers by being able to add them to ZenDiscoveryModule using a plugin --- .../discovery/zen/ZenDiscoveryModule.java | 20 ++++++++++++++++++- .../discovery/zen/ping/ZenPingService.java | 8 ++++++-- .../ping/unicast/UnicastHostsProvider.java | 5 ++++- .../zen/ping/unicast/UnicastZenPing.java | 10 ++++++++-- .../zen/ping/unicast/UnicastZenPingTests.java | 4 ++-- 5 files changed, 39 insertions(+), 8 deletions(-) diff --git a/src/main/java/org/elasticsearch/discovery/zen/ZenDiscoveryModule.java b/src/main/java/org/elasticsearch/discovery/zen/ZenDiscoveryModule.java index 202084d586b..71807c633e2 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/ZenDiscoveryModule.java +++ b/src/main/java/org/elasticsearch/discovery/zen/ZenDiscoveryModule.java @@ -19,18 +19,36 @@ package org.elasticsearch.discovery.zen; +import com.google.common.collect.Lists; import org.elasticsearch.common.inject.AbstractModule; +import org.elasticsearch.common.inject.multibindings.Multibinder; import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.zen.ping.ZenPingService; +import org.elasticsearch.discovery.zen.ping.unicast.UnicastHostsProvider; + +import java.util.List; /** - * */ public class ZenDiscoveryModule extends AbstractModule { + private final List> unicastHostProviders = Lists.newArrayList(); + + /** + * Adds a custom unicast hosts provider to build a dynamic list of unicast hosts list when doing unicast discovery. + */ + public ZenDiscoveryModule addUnicastHostProvider(Class unicastHostProvider) { + unicastHostProviders.add(unicastHostProvider); + return this; + } + @Override protected void configure() { bind(ZenPingService.class).asEagerSingleton(); + Multibinder unicastHostsProviderMultibinder = Multibinder.newSetBinder(binder(), UnicastHostsProvider.class); + for (Class unicastHostProvider : unicastHostProviders) { + unicastHostsProviderMultibinder.addBinding().to(unicastHostProvider); + } bindDiscovery(); } diff --git a/src/main/java/org/elasticsearch/discovery/zen/ping/ZenPingService.java b/src/main/java/org/elasticsearch/discovery/zen/ping/ZenPingService.java index 91689cc7cb2..b3135a38e58 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/ping/ZenPingService.java +++ b/src/main/java/org/elasticsearch/discovery/zen/ping/ZenPingService.java @@ -24,6 +24,7 @@ import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchIllegalStateException; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.network.NetworkService; @@ -31,10 +32,12 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.discovery.zen.DiscoveryNodesProvider; import org.elasticsearch.discovery.zen.ping.multicast.MulticastZenPing; +import org.elasticsearch.discovery.zen.ping.unicast.UnicastHostsProvider; import org.elasticsearch.discovery.zen.ping.unicast.UnicastZenPing; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; @@ -49,7 +52,8 @@ public class ZenPingService extends AbstractLifecycleComponent implemen private volatile ImmutableList zenPings = ImmutableList.of(); @Inject - public ZenPingService(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName, NetworkService networkService) { + public ZenPingService(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName, NetworkService networkService, + @Nullable Set unicastHostsProviders) { super(settings); ImmutableList.Builder zenPingsBuilder = ImmutableList.builder(); @@ -57,7 +61,7 @@ public class ZenPingService extends AbstractLifecycleComponent implemen zenPingsBuilder.add(new MulticastZenPing(settings, threadPool, transportService, clusterName, networkService)); } // always add the unicast hosts, so it will be able to receive unicast requests even when working in multicast - zenPingsBuilder.add(new UnicastZenPing(settings, threadPool, transportService, clusterName)); + zenPingsBuilder.add(new UnicastZenPing(settings, threadPool, transportService, clusterName, unicastHostsProviders)); this.zenPings = zenPingsBuilder.build(); } diff --git a/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastHostsProvider.java b/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastHostsProvider.java index 1b5f8365737..737a117fa23 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastHostsProvider.java +++ b/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastHostsProvider.java @@ -24,9 +24,12 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import java.util.List; /** - * + * A pluggable provider of the list of unicast hosts to use for unicast discovery. */ public interface UnicastHostsProvider { + /** + * Builds the dynamic list of unicast hosts to be used for unicast discovery. + */ List buildDynamicNodes(); } diff --git a/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java b/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java index 6e8dca656ae..c1b01ff0871 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java +++ b/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java @@ -83,15 +83,21 @@ public class UnicastZenPing extends AbstractLifecycleComponent implemen private final CopyOnWriteArrayList hostsProviders = new CopyOnWriteArrayList(); public UnicastZenPing(ThreadPool threadPool, TransportService transportService, ClusterName clusterName) { - this(EMPTY_SETTINGS, threadPool, transportService, clusterName); + this(EMPTY_SETTINGS, threadPool, transportService, clusterName, null); } - public UnicastZenPing(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName) { + public UnicastZenPing(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName, @Nullable Set unicastHostsProviders) { super(settings); this.threadPool = threadPool; this.transportService = transportService; this.clusterName = clusterName; + if (unicastHostsProviders != null) { + for (UnicastHostsProvider unicastHostsProvider : unicastHostsProviders) { + addHostsProvider(unicastHostsProvider); + } + } + this.concurrentConnects = componentSettings.getAsInt("concurrent_connects", 10); String[] hostArr = componentSettings.getAsArray("hosts"); // trim the hosts diff --git a/src/test/java/org/elasticsearch/test/unit/discovery/zen/ping/unicast/UnicastZenPingTests.java b/src/test/java/org/elasticsearch/test/unit/discovery/zen/ping/unicast/UnicastZenPingTests.java index ebeb09e8776..9a8effbf89a 100644 --- a/src/test/java/org/elasticsearch/test/unit/discovery/zen/ping/unicast/UnicastZenPingTests.java +++ b/src/test/java/org/elasticsearch/test/unit/discovery/zen/ping/unicast/UnicastZenPingTests.java @@ -64,7 +64,7 @@ public class UnicastZenPingTests { addressB.address().getAddress().getHostAddress() + ":" + addressB.address().getPort()) .build(); - UnicastZenPing zenPingA = new UnicastZenPing(hostsSettings, threadPool, transportServiceA, clusterName); + UnicastZenPing zenPingA = new UnicastZenPing(hostsSettings, threadPool, transportServiceA, clusterName, null); zenPingA.setNodesProvider(new DiscoveryNodesProvider() { @Override public DiscoveryNodes nodes() { @@ -78,7 +78,7 @@ public class UnicastZenPingTests { }); zenPingA.start(); - UnicastZenPing zenPingB = new UnicastZenPing(hostsSettings, threadPool, transportServiceB, clusterName); + UnicastZenPing zenPingB = new UnicastZenPing(hostsSettings, threadPool, transportServiceB, clusterName, null); zenPingB.setNodesProvider(new DiscoveryNodesProvider() { @Override public DiscoveryNodes nodes() {