finalize the connection listener support

This commit is contained in:
kimchy 2010-04-18 17:52:18 +03:00
parent 72629fc5ec
commit fcc32cbe21
5 changed files with 35 additions and 2 deletions

View File

@ -35,6 +35,7 @@ import org.elasticsearch.util.transport.LocalTransportAddress;
import org.elasticsearch.util.transport.TransportAddress;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
@ -81,6 +82,14 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
@Override protected void doStop() throws ElasticSearchException {
transports.remove(localAddress);
// now, go over all the transports connected to me, and raise disconnected event
for (LocalTransport targetTransport : transports.values()) {
for (Map.Entry<DiscoveryNode, LocalTransport> entry : targetTransport.connectedNodes.entrySet()) {
if (entry.getValue() == this) {
targetTransport.disconnectFromNode(entry.getKey());
}
}
}
}
@Override protected void doClose() throws ElasticSearchException {

View File

@ -542,6 +542,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
channel.close().awaitUninterruptibly();
}
}
logger.debug("Disconnected from [{}]", node);
transportServiceAdapter.raiseNodeDisconnected(node);
}
}

View File

@ -30,6 +30,8 @@ import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*;
@ -146,6 +148,23 @@ public abstract class AbstractSimpleTransportTests {
}
@Test
public void testDisconnectListener() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
TransportConnectionListener disconnectListener = new TransportConnectionListener() {
@Override public void onNodeConnected(DiscoveryNode node) {
throw new RuntimeException("Should not be called");
}
@Override public void onNodeDisconnected(DiscoveryNode node) {
latch.countDown();
}
};
serviceA.addConnectionListener(disconnectListener);
serviceB.close();
assertThat(latch.await(1, TimeUnit.SECONDS), equalTo(true));
}
private class StringMessage implements Streamable {
private String message;

View File

@ -24,14 +24,17 @@ import org.elasticsearch.transport.AbstractSimpleTransportTests;
import org.elasticsearch.transport.TransportService;
import org.testng.annotations.Test;
import static org.elasticsearch.util.settings.ImmutableSettings.*;
@Test
public class SimpleNettyTransportTests extends AbstractSimpleTransportTests {
@Override protected void build() {
serviceA = new TransportService(new NettyTransport(threadPool), threadPool).start();
serviceA = new TransportService(settingsBuilder().put("name", "A").build(), new NettyTransport(settingsBuilder().put("name", "A").build(), threadPool), threadPool).start();
serviceANode = new DiscoveryNode("A", serviceA.boundAddress().publishAddress());
serviceB = new TransportService(new NettyTransport(threadPool), threadPool).start();
serviceB = new TransportService(settingsBuilder().put("name", "B").build(), new NettyTransport(settingsBuilder().put("name", "B").build(), threadPool), threadPool).start();
serviceBNode = new DiscoveryNode("B", serviceB.boundAddress().publishAddress());
}
}

View File

@ -1,6 +1,7 @@
log4j.rootLogger=INFO, out
log4j.logger.jgroups=WARN
#log4j.logger.transport=TRACE
#log4j.logger.discovery=TRACE
#log4j.logger.cluster.service=TRACE
#log4j.logger.cluster.action.shard=DEBUG