UnicastZenPing shouldn't ping the address of the local node (#23567)

Pinging the local node address doesn't really add to discovering other nodes. It just pollutes the logs with unneeded information.
This commit is contained in:
Boaz Leskes 2017-03-14 06:47:41 -07:00
parent 3200da0327
commit c0cafa786b
3 changed files with 108 additions and 10 deletions

View File

@ -65,6 +65,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
@ -214,6 +215,9 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
final List<Future<TransportAddress[]>> futures =
executorService.invokeAll(callables, resolveTimeout.nanos(), TimeUnit.NANOSECONDS);
final List<DiscoveryNode> discoveryNodes = new ArrayList<>();
final Set<TransportAddress> localAddresses = new HashSet<>();
localAddresses.add(transportService.boundAddress().publishAddress());
localAddresses.addAll(Arrays.asList(transportService.boundAddress().boundAddresses()));
// ExecutorService#invokeAll guarantees that the futures are returned in the iteration order of the tasks so we can associate the
// hostname with the corresponding task by iterating together
final Iterator<String> it = hosts.iterator();
@ -225,13 +229,17 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
final TransportAddress[] addresses = future.get();
logger.trace("resolved host [{}] to {}", hostname, addresses);
for (int addressId = 0; addressId < addresses.length; addressId++) {
discoveryNodes.add(
new DiscoveryNode(
nodeId_prefix + hostname + "_" + addressId + "#",
addresses[addressId],
emptyMap(),
emptySet(),
Version.CURRENT.minimumCompatibilityVersion()));
final TransportAddress address = addresses[addressId];
// no point in pinging ourselves
if (localAddresses.contains(address) == false) {
discoveryNodes.add(
new DiscoveryNode(
nodeId_prefix + hostname + "_" + addressId + "#",
address,
emptyMap(),
emptySet(),
Version.CURRENT.minimumCompatibilityVersion()));
}
}
} catch (final ExecutionException e) {
assert e.getCause() != null;

View File

@ -33,6 +33,7 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
@ -61,6 +62,7 @@ import org.mockito.Matchers;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
@ -423,7 +425,16 @@ public class UnicastZenPingTests extends ESTestCase {
new NoneCircuitBreakerService(),
new NamedWriteableRegistry(Collections.emptyList()),
networkService,
Version.CURRENT);
Version.CURRENT) {
@Override
public BoundTransportAddress boundAddress() {
return new BoundTransportAddress(
new TransportAddress[]{new TransportAddress(InetAddress.getLoopbackAddress(), 9500)},
new TransportAddress(InetAddress.getLoopbackAddress(), 9500)
);
}
};
closeables.push(transport);
final TransportService transportService =
new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null);
@ -446,6 +457,50 @@ public class UnicastZenPingTests extends ESTestCase {
assertThat(ports, equalTo(IntStream.range(9300, 9300 + limitPortCounts).mapToObj(m -> m).collect(Collectors.toSet())));
}
public void testRemovingLocalAddresses() throws InterruptedException {
final NetworkService networkService = new NetworkService(Settings.EMPTY, Collections.emptyList());
final InetAddress loopbackAddress = InetAddress.getLoopbackAddress();
final Transport transport = new MockTcpTransport(
Settings.EMPTY,
threadPool,
BigArrays.NON_RECYCLING_INSTANCE,
new NoneCircuitBreakerService(),
new NamedWriteableRegistry(Collections.emptyList()),
networkService,
Version.CURRENT) {
@Override
public BoundTransportAddress boundAddress() {
return new BoundTransportAddress(
new TransportAddress[]{
new TransportAddress(loopbackAddress, 9300),
new TransportAddress(loopbackAddress, 9301)
},
new TransportAddress(loopbackAddress, 9302)
);
}
};
closeables.push(transport);
final TransportService transportService =
new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null);
closeables.push(transportService);
final List<DiscoveryNode> discoveryNodes = TestUnicastZenPing.resolveHostsLists(
executorService,
logger,
Collections.singletonList(NetworkAddress.format(loopbackAddress)),
10,
transportService,
"test_",
TimeValue.timeValueSeconds(1));
assertThat(discoveryNodes, hasSize(7));
final Set<Integer> ports = new HashSet<>();
for (final DiscoveryNode discoveryNode : discoveryNodes) {
assertTrue(discoveryNode.getAddress().address().getAddress().isLoopbackAddress());
ports.add(discoveryNode.getAddress().getPort());
}
assertThat(ports, equalTo(IntStream.range(9303, 9310).mapToObj(m -> m).collect(Collectors.toSet())));
}
public void testUnknownHost() throws InterruptedException {
final Logger logger = mock(Logger.class);
final NetworkService networkService = new NetworkService(Settings.EMPTY, Collections.emptyList());
@ -460,6 +515,14 @@ public class UnicastZenPingTests extends ESTestCase {
networkService,
Version.CURRENT) {
@Override
public BoundTransportAddress boundAddress() {
return new BoundTransportAddress(
new TransportAddress[]{new TransportAddress(InetAddress.getLoopbackAddress(), 9300)},
new TransportAddress(InetAddress.getLoopbackAddress(), 9300)
);
}
@Override
public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException {
throw unknownHostException;
@ -499,6 +562,14 @@ public class UnicastZenPingTests extends ESTestCase {
networkService,
Version.CURRENT) {
@Override
public BoundTransportAddress boundAddress() {
return new BoundTransportAddress(
new TransportAddress[]{new TransportAddress(InetAddress.getLoopbackAddress(), 9500)},
new TransportAddress(InetAddress.getLoopbackAddress(), 9500)
);
}
@Override
public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException {
if ("hostname1".equals(address)) {
@ -703,7 +774,15 @@ public class UnicastZenPingTests extends ESTestCase {
new NoneCircuitBreakerService(),
new NamedWriteableRegistry(Collections.emptyList()),
networkService,
Version.CURRENT);
Version.CURRENT) {
@Override
public BoundTransportAddress boundAddress() {
return new BoundTransportAddress(
new TransportAddress[]{new TransportAddress(InetAddress.getLoopbackAddress(), 9300)},
new TransportAddress(InetAddress.getLoopbackAddress(), 9300)
);
}
};
closeables.push(transport);
final TransportService transportService =

View File

@ -23,6 +23,8 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.env.Environment;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
@ -37,6 +39,7 @@ import org.junit.Before;
import java.io.BufferedWriter;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
@ -85,7 +88,15 @@ public class FileBasedUnicastHostsProviderTests extends ESTestCase {
BigArrays.NON_RECYCLING_INSTANCE,
new NoneCircuitBreakerService(),
new NamedWriteableRegistry(Collections.emptyList()),
new NetworkService(Settings.EMPTY, Collections.emptyList()));
new NetworkService(Settings.EMPTY, Collections.emptyList())) {
@Override
public BoundTransportAddress boundAddress() {
return new BoundTransportAddress(
new TransportAddress[]{new TransportAddress(InetAddress.getLoopbackAddress(), 9300)},
new TransportAddress(InetAddress.getLoopbackAddress(), 9300)
);
}
};
transportService = new MockTransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
null);
}