diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java index bd39ac3fc4a..0f12e90ebe2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java @@ -171,8 +171,7 @@ class AsyncConnectionImpl implements AsyncConnection { } }, conf, listenerClass); } catch (IOException e) { - LOG.warn("Failed to create ClusterStatusListener, not a critical problem, ignoring...", - e); + LOG.warn("Failed create of ClusterStatusListener, not a critical, ignoring...", e); } } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterStatusListener.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterStatusListener.java index 73612381530..ccdfec7f4f6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterStatusListener.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterStatusListener.java @@ -37,10 +37,6 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.util.Addressing; import org.apache.hadoop.hbase.util.ExceptionUtil; import org.apache.hadoop.hbase.util.Threads; -import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap; import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufInputStream; import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; @@ -51,8 +47,10 @@ import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; import org.apache.hbase.thirdparty.io.netty.channel.socket.DatagramChannel; import org.apache.hbase.thirdparty.io.netty.channel.socket.DatagramPacket; import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioDatagramChannel; - import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A class that receives the cluster status, and provide it as a set of service to the client. @@ -208,10 +206,9 @@ class ClusterStatusListener implements Closeable { try { Bootstrap b = new Bootstrap(); b.group(group) - .channel(NioDatagramChannel.class) - .option(ChannelOption.SO_REUSEADDR, true) - .handler(new ClusterStatusHandler()); - + .channel(NioDatagramChannel.class) + .option(ChannelOption.SO_REUSEADDR, true) + .handler(new ClusterStatusHandler()); channel = (DatagramChannel)b.bind(bindAddress, port).sync().channel(); } catch (InterruptedException e) { close(); @@ -225,9 +222,11 @@ class ClusterStatusListener implements Closeable { ni = NetworkInterface.getByInetAddress(Addressing.getIpAddress()); } + LOG.debug("Channel bindAddress={}, networkInterface={}, INA={}", bindAddress, ni, ina); channel.joinGroup(ina, ni, null, channel.newPromise()); } + @Override public void close() { if (channel != null) { @@ -252,8 +251,7 @@ class ClusterStatusListener implements Closeable { } @Override - public boolean acceptInboundMessage(Object msg) - throws Exception { + public boolean acceptInboundMessage(Object msg) throws Exception { return super.acceptInboundMessage(msg); } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/ScheduledChore.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/ScheduledChore.java index b5197a0572d..86dcb2cbdc1 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/ScheduledChore.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/ScheduledChore.java @@ -373,7 +373,7 @@ public abstract class ScheduledChore implements Runnable { @InterfaceAudience.Private @Override public String toString() { - return "[ScheduledChore: Name: " + getName() + " Period: " + getPeriod() + " Unit: " - + getTimeUnit() + "]"; + return "ScheduledChore name=" + getName() + ", period=" + getPeriod() + + ", unit=" + getTimeUnit(); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterStatusPublisher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterStatusPublisher.java index af35ce4eb30..825746695f2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterStatusPublisher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterStatusPublisher.java @@ -1,5 +1,4 @@ -/** - * +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -49,12 +48,11 @@ import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.VersionInfo; import org.apache.yetus.audience.InterfaceAudience; - import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap; -import org.apache.hbase.thirdparty.io.netty.bootstrap.ChannelFactory; import org.apache.hbase.thirdparty.io.netty.buffer.Unpooled; import org.apache.hbase.thirdparty.io.netty.channel.Channel; import org.apache.hbase.thirdparty.io.netty.channel.ChannelException; +import org.apache.hbase.thirdparty.io.netty.channel.ChannelFactory; import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption; import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; @@ -65,6 +63,8 @@ import org.apache.hbase.thirdparty.io.netty.channel.socket.InternetProtocolFamil import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioDatagramChannel; import org.apache.hbase.thirdparty.io.netty.handler.codec.MessageToMessageEncoder; import org.apache.hbase.thirdparty.io.netty.util.internal.StringUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** @@ -75,6 +75,7 @@ import org.apache.hbase.thirdparty.io.netty.util.internal.StringUtil; */ @InterfaceAudience.Private public class ClusterStatusPublisher extends ScheduledChore { + private static Logger LOG = LoggerFactory.getLogger(ClusterStatusPublisher.class); /** * The implementation class used to publish the status. Default is null (no publish). * Use org.apache.hadoop.hbase.master.ClusterStatusPublisher.MulticastPublisher to multicast the @@ -113,7 +114,7 @@ public class ClusterStatusPublisher extends ScheduledChore { public ClusterStatusPublisher(HMaster master, Configuration conf, Class publisherClass) throws IOException { - super("HBase clusterStatusPublisher for " + master.getName(), master, conf.getInt( + super("ClusterStatusPublisher for=" + master.getName(), master, conf.getInt( STATUS_PUBLISH_PERIOD, DEFAULT_STATUS_PUBLISH_PERIOD)); this.master = master; this.messagePeriod = conf.getInt(STATUS_PUBLISH_PERIOD, DEFAULT_STATUS_PUBLISH_PERIOD); @@ -126,6 +127,11 @@ public class ClusterStatusPublisher extends ScheduledChore { connected = true; } + @Override + public String toString() { + return super.toString() + ", publisher=" + this.publisher + ", connected=" + this.connected; + } + // For tests only protected ClusterStatusPublisher() { master = null; @@ -245,6 +251,11 @@ public class ClusterStatusPublisher extends ScheduledChore { public MulticastPublisher() { } + @Override + public String toString() { + return "channel=" + this.channel; + } + @Override public void connect(Configuration conf) throws IOException { String mcAddress = conf.get(HConstants.STATUS_MULTICAST_ADDRESS, @@ -262,7 +273,6 @@ public class ClusterStatusPublisher extends ScheduledChore { close(); throw new IOException("Can't connect to " + mcAddress, e); } - final InetSocketAddress isa = new InetSocketAddress(mcAddress, port); InternetProtocolFamily family; @@ -285,17 +295,23 @@ public class ClusterStatusPublisher extends ScheduledChore { } ni = NetworkInterface.getByInetAddress(localAddress); } - Bootstrap b = new Bootstrap(); b.group(group) .channelFactory(new HBaseDatagramChannelFactory(NioDatagramChannel.class, family)) .option(ChannelOption.SO_REUSEADDR, true) .handler(new ClusterMetricsEncoder(isa)); - try { + LOG.debug("Channel bindAddress={}, networkInterface={}, INA={}", bindAddress, ni, ina); channel = (DatagramChannel) b.bind(bindAddress, 0).sync().channel(); channel.joinGroup(ina, ni, null, channel.newPromise()).sync(); channel.connect(isa).sync(); + // Set into configuration in case many networks available. Do this for tests so that + // server and client use same Interface (presuming share same Configuration). + // TestAsyncTableRSCrashPublish was failing when connected to VPN because extra networks + // available with Master binding on one Interface and client on another so test failed. + if (ni != null) { + conf.set(HConstants.STATUS_MULTICAST_NI_NAME, ni.getName()); + } } catch (InterruptedException e) { close(); throw ExceptionUtil.asInterrupt(e); @@ -303,9 +319,9 @@ public class ClusterStatusPublisher extends ScheduledChore { } private static final class HBaseDatagramChannelFactory - implements ChannelFactory { + implements ChannelFactory { private final Class clazz; - private InternetProtocolFamily family; + private final InternetProtocolFamily family; HBaseDatagramChannelFactory(Class clazz, InternetProtocolFamily family) { this.clazz = clazz; @@ -347,6 +363,7 @@ public class ClusterStatusPublisher extends ScheduledChore { @Override public void publish(ClusterMetrics cs) { + LOG.info("PUBLISH {}", cs); channel.writeAndFlush(cs).syncUninterruptibly(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 05117da038a..d0fd7f761ac 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -571,6 +571,7 @@ public class HMaster extends HRegionServer implements MasterServices { " is not set - not publishing status"); } else { clusterStatusPublisherChore = new ClusterStatusPublisher(this, conf, publisherClass); + LOG.debug("Created {}", this.clusterStatusPublisherChore); getChoreService().scheduleChore(clusterStatusPublisherChore); } }