HBASE-9452 Simplify the configuration of the multicast notifier

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1520999 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
nkeywal 2013-09-09 08:53:29 +00:00
parent cf05666f03
commit 9de8402281
7 changed files with 83 additions and 25 deletions

View File

@ -66,10 +66,11 @@ class ClusterStatusListener implements Closeable {
private final Listener listener;
/**
* The implementation class to use to read the status. Default is null.
* The implementation class to use to read the status.
*/
public static final String STATUS_LISTENER_CLASS = "hbase.status.listener.class";
public static final Class<? extends Listener> DEFAULT_STATUS_LISTENER_CLASS = null;
public static final Class<? extends Listener> DEFAULT_STATUS_LISTENER_CLASS =
MulticastListener.class;
/**
* Class to be extended to manage a new dead server.
@ -87,7 +88,7 @@ class ClusterStatusListener implements Closeable {
/**
* The interface to be implented by a listener of a cluster status event.
* The interface to be implemented by a listener of a cluster status event.
*/
interface Listener extends Closeable {
/**
@ -175,13 +176,13 @@ class ClusterStatusListener implements Closeable {
/**
* An implementation using a multicast message between the master & the client.
*/
class MultiCastListener implements Listener {
class MulticastListener implements Listener {
private DatagramChannel channel;
private final ExecutorService service = Executors.newSingleThreadExecutor(
Threads.newDaemonThreadFactory("hbase-client-clusterStatus-multiCastListener"));
public MultiCastListener() {
public MulticastListener() {
}
public void connect(Configuration conf) throws IOException {

View File

@ -670,21 +670,28 @@ public class HConnectionManager {
this.rpcClient = new RpcClient(this.conf, this.clusterId);
// Do we publish the status?
boolean shouldListen = conf.getBoolean(HConstants.STATUS_PUBLISHED,
HConstants.STATUS_PUBLISHED_DEFAULT);
Class<? extends ClusterStatusListener.Listener> listenerClass =
conf.getClass(ClusterStatusListener.STATUS_LISTENER_CLASS,
ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS,
ClusterStatusListener.Listener.class);
if (listenerClass != null) {
clusterStatusListener = new ClusterStatusListener(
new ClusterStatusListener.DeadServerHandler() {
@Override
public void newDead(ServerName sn) {
clearCaches(sn);
rpcClient.cancelConnections(sn.getHostname(), sn.getPort(),
new SocketException(sn.getServerName() + " is dead: closing its connection."));
}
}, conf, listenerClass);
if (shouldListen) {
if (listenerClass == null) {
LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " +
ClusterStatusListener.STATUS_LISTENER_CLASS + " is not set - not listening status");
} else {
clusterStatusListener = new ClusterStatusListener(
new ClusterStatusListener.DeadServerHandler() {
@Override
public void newDead(ServerName sn) {
clearCaches(sn);
rpcClient.cancelConnections(sn.getHostname(), sn.getPort(),
new SocketException(sn.getServerName() +
" is dead: closing its connection."));
}
}, conf, listenerClass);
}
}
}

View File

@ -865,6 +865,13 @@ public final class HConstants {
public static final int DEFAULT_HEALTH_FAILURE_THRESHOLD = 3;
/**
* Setting to activate, or not, the publication of the status by the master. Default
* notification is by a multicast message.
*/
public static final String STATUS_PUBLISHED = "hbase.status.published";
public static final boolean STATUS_PUBLISHED_DEFAULT = false;
/**
* IP to use for the multicast status messages between the master and the clients.
* The default address is chosen as one among others within the ones suitable for multicast

View File

@ -978,5 +978,43 @@ possible configurations would overwhelm and obscure the important.
</description>
</property>
<property>
<name>hbase.status.published</name>
<value>false</value>
<description>
This setting activates the publication by the master of the status of the region server.
When a region server dies and its recovery starts, the master will push this information
to the client application, to let them cut the connection immediately instead of waiting
for a timeout.
</description>
</property>
<property>
<name>hbase.status.publisher.class</name>
<value>org.apache.hadoop.hbase.master.ClusterStatusPublisher$MulticastPublisher</value>
<description>
Implementation of the status publication with a multicast message.
</description>
</property>
<property>
<name>hbase.status.listener.class</name>
<value>org.apache.hadoop.hbase.client.ClusterStatusListener$MulticastListener</value>
<description>
Implementation of the status listener with a multicast message.
</description>
</property>
<property>
<name>hbase.status.multicast.address.ip</name>
<value>226.1.1.3</value>
<description>
Multicast address to use for the status publication by multicast.
</description>
</property>
<property>
<name>hbase.status.multicast.address.port</name>
<value>6100</value>
<description>
Multicast port to use for the status publication by multicast.
</description>
</property>
</configuration>

View File

@ -72,7 +72,8 @@ public class ClusterStatusPublisher extends Chore {
*/
public static final String STATUS_PUBLISHER_CLASS = "hbase.status.publisher.class";
public static final Class<? extends ClusterStatusPublisher.Publisher>
DEFAULT_STATUS_PUBLISHER_CLASS = null;
DEFAULT_STATUS_PUBLISHER_CLASS =
org.apache.hadoop.hbase.master.ClusterStatusPublisher.MulticastPublisher.class;
/**
* The minimum time between two status messages, in milliseconds.

View File

@ -481,14 +481,22 @@ MasterServices, Server {
}
// Do we publish the status?
boolean shouldPublish = conf.getBoolean(HConstants.STATUS_PUBLISHED,
HConstants.STATUS_PUBLISHED_DEFAULT);
Class<? extends ClusterStatusPublisher.Publisher> publisherClass =
conf.getClass(ClusterStatusPublisher.STATUS_PUBLISHER_CLASS,
ClusterStatusPublisher.DEFAULT_STATUS_PUBLISHER_CLASS,
ClusterStatusPublisher.Publisher.class);
if (publisherClass != null) {
clusterStatusPublisherChore = new ClusterStatusPublisher(this, conf, publisherClass);
Threads.setDaemonThreadRunning(clusterStatusPublisherChore.getThread());
if (shouldPublish) {
if (publisherClass == null) {
LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " +
ClusterStatusPublisher.DEFAULT_STATUS_PUBLISHER_CLASS +
" is not set - not publishing status");
} else {
clusterStatusPublisherChore = new ClusterStatusPublisher(this, conf, publisherClass);
Threads.setDaemonThreadRunning(clusterStatusPublisherChore.getThread());
}
}
distributedLogReplay = this.conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY,

View File

@ -100,11 +100,7 @@ public class TestHCM {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setClass(ClusterStatusPublisher.STATUS_PUBLISHER_CLASS,
ClusterStatusPublisher.MulticastPublisher.class, ClusterStatusPublisher.Publisher.class);
TEST_UTIL.getConfiguration().setClass(ClusterStatusListener.STATUS_LISTENER_CLASS,
ClusterStatusListener.MultiCastListener.class, ClusterStatusListener.Listener.class);
TEST_UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, true);
TEST_UTIL.startMiniCluster(2);
}