always start the unicast ping discovery, so unicast discovery will work even when using multicast

This commit is contained in:
kimchy 2010-05-31 22:24:51 +03:00
parent 1749775414
commit ec662cc019
6 changed files with 69 additions and 24 deletions

View File

@ -11,6 +11,7 @@
<w>blobstore</w>
<w>bool</w>
<w>booleans</w>
<w>cacheable</w>
<w>camelcase</w>
<w>canonicalhost</w>
<w>checksum</w>
@ -27,6 +28,7 @@
<w>datas</w>
<w>desc</w>
<w>deserialize</w>
<w>docid</w>
<w>elasticsearch</w>
<w>estab</w>
<w>failover</w>

View File

@ -55,9 +55,8 @@ public class ZenPingService extends AbstractLifecycleComponent<ZenPing> implemen
if (componentSettings.getAsBoolean("multicast.enabled", true)) {
zenPingsBuilder.add(new MulticastZenPing(settings, threadPool, transportService, clusterName, networkService));
}
if (componentSettings.get("unicast.hosts") != null || componentSettings.getAsArray("unicast.hosts").length > 0) {
zenPingsBuilder.add(new UnicastZenPing(settings, threadPool, transportService, clusterName));
}
// always add the unicast hosts, so it will be able to receive unicast requests even when working in multicast
zenPingsBuilder.add(new UnicastZenPing(settings, threadPool, transportService, clusterName));
this.zenPings = zenPingsBuilder.build();
}

View File

@ -0,0 +1,32 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.discovery.zen.ping.unicast;
import org.elasticsearch.cluster.node.DiscoveryNode;
import java.util.List;
/**
* @author kimchy (Shay Banon)
*/
public interface UnicastHostsProvider {
List<DiscoveryNode> buildDynamicNodes();
}

View File

@ -30,7 +30,6 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import org.elasticsearch.util.Strings;
import org.elasticsearch.util.TimeValue;
import org.elasticsearch.util.collect.ImmutableList;
import org.elasticsearch.util.collect.Lists;
import org.elasticsearch.util.component.AbstractLifecycleComponent;
import org.elasticsearch.util.concurrent.jsr166y.LinkedTransferQueue;
@ -44,10 +43,7 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@ -80,6 +76,8 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
// a list of temporal responses a node will return for a request (holds requests from other nodes)
private final Queue<PingResponse> temporalResponses = new LinkedTransferQueue<PingResponse>();
private final CopyOnWriteArrayList<UnicastHostsProvider> hostsProviders = new CopyOnWriteArrayList<UnicastHostsProvider>();
public UnicastZenPing(ThreadPool threadPool, TransportService transportService, ClusterName clusterName) {
this(EMPTY_SETTINGS, threadPool, transportService, clusterName);
}
@ -95,7 +93,7 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
hosts.addAll(Strings.commaDelimitedListToSet(componentSettings.get("hosts")));
}
logger.debug("Using hosts {}", hosts);
logger.debug("Using initial hosts {}", hosts);
List<DiscoveryNode> nodes = Lists.newArrayList();
int idCounter = 0;
@ -123,8 +121,12 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
transportService.removeHandler(UnicastPingRequestHandler.ACTION);
}
protected List<DiscoveryNode> buildDynamicNodes() {
return ImmutableList.of();
public void addHostsProvider(UnicastHostsProvider provider) {
hostsProviders.add(provider);
}
public void removeHostsProvider(UnicastHostsProvider provider) {
hostsProviders.remove(provider);
}
@Override public void setNodesProvider(DiscoveryNodesProvider nodesProvider) {
@ -169,7 +171,9 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
pingRequest.pingResponse = new PingResponse(discoNodes.localNode(), discoNodes.masterNode(), clusterName);
List<DiscoveryNode> nodesToPing = newArrayList(nodes);
nodesToPing.addAll(buildDynamicNodes());
for (UnicastHostsProvider provider : hostsProviders) {
nodesToPing.addAll(provider.buildDynamicNodes());
}
final CountDownLatch latch = new CountDownLatch(nodesToPing.size());
for (final DiscoveryNode node : nodesToPing) {

View File

@ -23,7 +23,9 @@ import org.elasticsearch.cloud.compute.CloudComputeService;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.discovery.zen.ping.ZenPing;
import org.elasticsearch.discovery.zen.ping.ZenPingService;
import org.elasticsearch.discovery.zen.ping.unicast.UnicastZenPing;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.util.collect.ImmutableList;
@ -39,9 +41,18 @@ public class CloudDiscovery extends ZenDiscovery {
ClusterService clusterService, ZenPingService pingService, CloudComputeService computeService) {
super(settings, clusterName, threadPool, transportService, clusterService, pingService);
if (settings.getAsBoolean("cloud.enabled", true)) {
CloudZenPing cloudPing = new CloudZenPing(settings, threadPool, transportService, clusterName, computeService);
cloudPing.setNodesProvider(this);
pingService.zenPings(ImmutableList.of(cloudPing));
ImmutableList<? extends ZenPing> zenPings = pingService.zenPings();
UnicastZenPing unicastZenPing = null;
for (ZenPing zenPing : zenPings) {
if (zenPing instanceof UnicastZenPing) {
unicastZenPing = (UnicastZenPing) zenPing;
break;
}
}
// update the unicast zen ping to add cloud hosts provider
// and, while we are at it, use only it and not the multicast for example
unicastZenPing.addHostsProvider(new CloudUnicastHostsProvider(settings, computeService));
pingService.zenPings(ImmutableList.of(unicastZenPing));
}
}
}

View File

@ -20,11 +20,9 @@
package org.elasticsearch.discovery.cloud;
import org.elasticsearch.cloud.compute.CloudComputeService;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.discovery.zen.ping.unicast.UnicastZenPing;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.discovery.zen.ping.unicast.UnicastHostsProvider;
import org.elasticsearch.util.component.AbstractComponent;
import org.elasticsearch.util.settings.Settings;
import org.elasticsearch.util.transport.InetSocketTransportAddress;
import org.elasticsearch.util.transport.PortsRange;
@ -42,7 +40,7 @@ import static org.elasticsearch.util.collect.Lists.*;
/**
* @author kimchy (shay.banon)
*/
public class CloudZenPing extends UnicastZenPing {
public class CloudUnicastHostsProvider extends AbstractComponent implements UnicastHostsProvider {
private final ComputeService computeService;
@ -52,9 +50,8 @@ public class CloudZenPing extends UnicastZenPing {
private final String location;
public CloudZenPing(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName,
CloudComputeService computeService) {
super(settings, threadPool, transportService, clusterName);
public CloudUnicastHostsProvider(Settings settings, CloudComputeService computeService) {
super(settings);
this.computeService = computeService.context().getComputeService();
this.tag = componentSettings.get("tag");
this.location = componentSettings.get("location");
@ -63,7 +60,7 @@ public class CloudZenPing extends UnicastZenPing {
new PortsRange(ports).ports();
}
@Override protected List<DiscoveryNode> buildDynamicNodes() {
@Override public List<DiscoveryNode> buildDynamicNodes() {
List<DiscoveryNode> discoNodes = newArrayList();
Set<? extends ComputeMetadata> nodes = computeService.listNodes();
if (logger.isTraceEnabled()) {