From 2605bf7e89abeed597f86a9f8d6c4a294e203081 Mon Sep 17 00:00:00 2001 From: kimchy Date: Thu, 6 May 2010 09:21:56 +0300 Subject: [PATCH] add unicast ping test --- .../zen/ping/unicast/UnicastZenPing.java | 23 +++++ .../zen/ping/unicast/UnicastZenPingTests.java | 95 +++++++++++++++++++ 2 files changed, 118 insertions(+) create mode 100644 modules/elasticsearch/src/test/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPingTests.java diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java index 7f26a22dd19..e0d11813dd8 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java @@ -49,11 +49,13 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.discovery.zen.ping.ZenPing.PingResponse.*; import static org.elasticsearch.util.TimeValue.*; import static org.elasticsearch.util.collect.Lists.*; import static org.elasticsearch.util.concurrent.ConcurrentMaps.*; +import static org.elasticsearch.util.settings.ImmutableSettings.Builder.*; /** * @author kimchy (shay.banon) @@ -78,6 +80,10 @@ public class UnicastZenPing extends AbstractLifecycleComponent implemen // a list of temporal responses a node will return for a request (holds requests from other nodes) private final Queue temporalResponses = new LinkedTransferQueue(); + public UnicastZenPing(ThreadPool threadPool, TransportService transportService, ClusterName clusterName) { + this(EMPTY_SETTINGS, threadPool, transportService, clusterName); + } + public UnicastZenPing(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName) { super(settings); this.threadPool = threadPool; @@ -122,6 +128,23 @@ public class UnicastZenPing extends AbstractLifecycleComponent implemen this.nodesProvider = nodesProvider; } + public PingResponse[] pingAndWait(TimeValue timeout) { + final AtomicReference response = new AtomicReference(); + final CountDownLatch latch = new CountDownLatch(1); + ping(new PingListener() { + @Override public void onPing(PingResponse[] pings) { + response.set(pings); + latch.countDown(); + } + }, timeout); + try { + latch.await(); + return response.get(); + } catch (InterruptedException e) { + return null; + } + } + @Override public void ping(final PingListener listener, final TimeValue timeout) throws ElasticSearchException { final int id = pingIdGenerator.incrementAndGet(); receivedResponses.put(id, new ConcurrentHashMap()); diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPingTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPingTests.java new file mode 100644 index 00000000000..9f90a7138a6 --- /dev/null +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPingTests.java @@ -0,0 +1,95 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.discovery.zen.ping.unicast; + +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.discovery.zen.DiscoveryNodesProvider; +import org.elasticsearch.discovery.zen.ping.ZenPing; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.threadpool.cached.CachedThreadPool; +import org.elasticsearch.timer.TimerService; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.transport.netty.NettyTransport; +import org.elasticsearch.util.TimeValue; +import org.elasticsearch.util.settings.ImmutableSettings; +import org.elasticsearch.util.settings.Settings; +import org.elasticsearch.util.transport.InetSocketTransportAddress; +import org.testng.annotations.Test; + +import static org.hamcrest.MatcherAssert.*; +import static org.hamcrest.Matchers.*; + +/** + * @author kimchy (shay.banon) + */ +public class UnicastZenPingTests { + + @Test public void testSimplePings() { + ThreadPool threadPool = new CachedThreadPool(); + TimerService timerService = new TimerService(threadPool); + ClusterName clusterName = new ClusterName("test"); + NettyTransport transportA = new NettyTransport(threadPool); + final TransportService transportServiceA = new TransportService(transportA, threadPool, timerService).start(); + final DiscoveryNode nodeA = new DiscoveryNode("A", transportServiceA.boundAddress().publishAddress()); + + InetSocketTransportAddress addressA = (InetSocketTransportAddress) transportA.boundAddress().publishAddress(); + + NettyTransport transportB = new NettyTransport(threadPool); + final TransportService transportServiceB = new TransportService(transportB, threadPool, timerService).start(); + final DiscoveryNode nodeB = new DiscoveryNode("B", transportServiceA.boundAddress().publishAddress()); + + InetSocketTransportAddress addressB = (InetSocketTransportAddress) transportB.boundAddress().publishAddress(); + + Settings hostsSettings = ImmutableSettings.settingsBuilder().putArray("discovery.zen.ping.unicast.hosts", + addressA.address().getAddress().getHostAddress() + ":" + addressA.address().getPort(), + addressB.address().getAddress().getHostAddress() + ":" + addressB.address().getPort()) + .build(); + + UnicastZenPing zenPingA = new UnicastZenPing(hostsSettings, threadPool, transportServiceA, clusterName); + zenPingA.setNodesProvider(new DiscoveryNodesProvider() { + @Override public DiscoveryNodes nodes() { + return DiscoveryNodes.newNodesBuilder().put(nodeA).localNodeId("A").build(); + } + }); + zenPingA.start(); + + UnicastZenPing zenPingB = new UnicastZenPing(hostsSettings, threadPool, transportServiceB, clusterName); + zenPingB.setNodesProvider(new DiscoveryNodesProvider() { + @Override public DiscoveryNodes nodes() { + return DiscoveryNodes.newNodesBuilder().put(nodeB).localNodeId("B").build(); + } + }); + zenPingB.start(); + + try { + ZenPing.PingResponse[] pingResponses = zenPingA.pingAndWait(TimeValue.timeValueSeconds(1)); + assertThat(pingResponses.length, equalTo(1)); + assertThat(pingResponses[0].target().id(), equalTo("B")); + } finally { + zenPingA.close(); + zenPingB.close(); + transportServiceA.close(); + transportServiceB.close(); + threadPool.shutdown(); + } + } +}