HBASE-23851 Log networks and bind addresses when multicast publisher/listener enabled (#1173)
Signed-off-by: Sean Busbey <busbey@apache.org> Signed-off-by: Nick Dimiduk <ndimiduk@apache.org> Signed-off-by: Viraj Jasani <vjasani@apache.org>
This commit is contained in:
parent
5750876191
commit
e1e8f396ca
|
@ -171,8 +171,7 @@ class AsyncConnectionImpl implements AsyncConnection {
|
||||||
}
|
}
|
||||||
}, conf, listenerClass);
|
}, conf, listenerClass);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.warn("Failed to create ClusterStatusListener, not a critical problem, ignoring...",
|
LOG.warn("Failed create of ClusterStatusListener, not a critical, ignoring...", e);
|
||||||
e);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -37,10 +37,6 @@ import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.util.Addressing;
|
import org.apache.hadoop.hbase.util.Addressing;
|
||||||
import org.apache.hadoop.hbase.util.ExceptionUtil;
|
import org.apache.hadoop.hbase.util.ExceptionUtil;
|
||||||
import org.apache.hadoop.hbase.util.Threads;
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap;
|
import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap;
|
||||||
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufInputStream;
|
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufInputStream;
|
||||||
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
|
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
|
||||||
|
@ -51,8 +47,10 @@ import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
|
||||||
import org.apache.hbase.thirdparty.io.netty.channel.socket.DatagramChannel;
|
import org.apache.hbase.thirdparty.io.netty.channel.socket.DatagramChannel;
|
||||||
import org.apache.hbase.thirdparty.io.netty.channel.socket.DatagramPacket;
|
import org.apache.hbase.thirdparty.io.netty.channel.socket.DatagramPacket;
|
||||||
import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioDatagramChannel;
|
import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioDatagramChannel;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A class that receives the cluster status, and provide it as a set of service to the client.
|
* A class that receives the cluster status, and provide it as a set of service to the client.
|
||||||
|
@ -208,10 +206,9 @@ class ClusterStatusListener implements Closeable {
|
||||||
try {
|
try {
|
||||||
Bootstrap b = new Bootstrap();
|
Bootstrap b = new Bootstrap();
|
||||||
b.group(group)
|
b.group(group)
|
||||||
.channel(NioDatagramChannel.class)
|
.channel(NioDatagramChannel.class)
|
||||||
.option(ChannelOption.SO_REUSEADDR, true)
|
.option(ChannelOption.SO_REUSEADDR, true)
|
||||||
.handler(new ClusterStatusHandler());
|
.handler(new ClusterStatusHandler());
|
||||||
|
|
||||||
channel = (DatagramChannel)b.bind(bindAddress, port).sync().channel();
|
channel = (DatagramChannel)b.bind(bindAddress, port).sync().channel();
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
close();
|
close();
|
||||||
|
@ -225,9 +222,11 @@ class ClusterStatusListener implements Closeable {
|
||||||
ni = NetworkInterface.getByInetAddress(Addressing.getIpAddress());
|
ni = NetworkInterface.getByInetAddress(Addressing.getIpAddress());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LOG.debug("Channel bindAddress={}, networkInterface={}, INA={}", bindAddress, ni, ina);
|
||||||
channel.joinGroup(ina, ni, null, channel.newPromise());
|
channel.joinGroup(ina, ni, null, channel.newPromise());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public void close() {
|
||||||
if (channel != null) {
|
if (channel != null) {
|
||||||
|
@ -252,8 +251,7 @@ class ClusterStatusListener implements Closeable {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean acceptInboundMessage(Object msg)
|
public boolean acceptInboundMessage(Object msg) throws Exception {
|
||||||
throws Exception {
|
|
||||||
return super.acceptInboundMessage(msg);
|
return super.acceptInboundMessage(msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -373,7 +373,7 @@ public abstract class ScheduledChore implements Runnable {
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "[ScheduledChore: Name: " + getName() + " Period: " + getPeriod() + " Unit: "
|
return "ScheduledChore name=" + getName() + ", period=" + getPeriod() +
|
||||||
+ getTimeUnit() + "]";
|
", unit=" + getTimeUnit();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,5 +1,4 @@
|
||||||
/**
|
/*
|
||||||
*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
* or more contributor license agreements. See the NOTICE file
|
* or more contributor license agreements. See the NOTICE file
|
||||||
* distributed with this work for additional information
|
* distributed with this work for additional information
|
||||||
|
@ -49,12 +48,11 @@ import org.apache.hadoop.hbase.util.ReflectionUtils;
|
||||||
import org.apache.hadoop.hbase.util.Threads;
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.apache.hadoop.hbase.util.VersionInfo;
|
import org.apache.hadoop.hbase.util.VersionInfo;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap;
|
import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap;
|
||||||
import org.apache.hbase.thirdparty.io.netty.bootstrap.ChannelFactory;
|
|
||||||
import org.apache.hbase.thirdparty.io.netty.buffer.Unpooled;
|
import org.apache.hbase.thirdparty.io.netty.buffer.Unpooled;
|
||||||
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
|
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
|
||||||
import org.apache.hbase.thirdparty.io.netty.channel.ChannelException;
|
import org.apache.hbase.thirdparty.io.netty.channel.ChannelException;
|
||||||
|
import org.apache.hbase.thirdparty.io.netty.channel.ChannelFactory;
|
||||||
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
|
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
|
||||||
import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption;
|
import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption;
|
||||||
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
|
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
|
||||||
|
@ -65,6 +63,8 @@ import org.apache.hbase.thirdparty.io.netty.channel.socket.InternetProtocolFamil
|
||||||
import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioDatagramChannel;
|
import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioDatagramChannel;
|
||||||
import org.apache.hbase.thirdparty.io.netty.handler.codec.MessageToMessageEncoder;
|
import org.apache.hbase.thirdparty.io.netty.handler.codec.MessageToMessageEncoder;
|
||||||
import org.apache.hbase.thirdparty.io.netty.util.internal.StringUtil;
|
import org.apache.hbase.thirdparty.io.netty.util.internal.StringUtil;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -75,6 +75,7 @@ import org.apache.hbase.thirdparty.io.netty.util.internal.StringUtil;
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class ClusterStatusPublisher extends ScheduledChore {
|
public class ClusterStatusPublisher extends ScheduledChore {
|
||||||
|
private static Logger LOG = LoggerFactory.getLogger(ClusterStatusPublisher.class);
|
||||||
/**
|
/**
|
||||||
* The implementation class used to publish the status. Default is null (no publish).
|
* The implementation class used to publish the status. Default is null (no publish).
|
||||||
* Use org.apache.hadoop.hbase.master.ClusterStatusPublisher.MulticastPublisher to multicast the
|
* Use org.apache.hadoop.hbase.master.ClusterStatusPublisher.MulticastPublisher to multicast the
|
||||||
|
@ -113,7 +114,7 @@ public class ClusterStatusPublisher extends ScheduledChore {
|
||||||
public ClusterStatusPublisher(HMaster master, Configuration conf,
|
public ClusterStatusPublisher(HMaster master, Configuration conf,
|
||||||
Class<? extends Publisher> publisherClass)
|
Class<? extends Publisher> publisherClass)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
super("HBase clusterStatusPublisher for " + master.getName(), master, conf.getInt(
|
super("ClusterStatusPublisher for=" + master.getName(), master, conf.getInt(
|
||||||
STATUS_PUBLISH_PERIOD, DEFAULT_STATUS_PUBLISH_PERIOD));
|
STATUS_PUBLISH_PERIOD, DEFAULT_STATUS_PUBLISH_PERIOD));
|
||||||
this.master = master;
|
this.master = master;
|
||||||
this.messagePeriod = conf.getInt(STATUS_PUBLISH_PERIOD, DEFAULT_STATUS_PUBLISH_PERIOD);
|
this.messagePeriod = conf.getInt(STATUS_PUBLISH_PERIOD, DEFAULT_STATUS_PUBLISH_PERIOD);
|
||||||
|
@ -126,6 +127,11 @@ public class ClusterStatusPublisher extends ScheduledChore {
|
||||||
connected = true;
|
connected = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return super.toString() + ", publisher=" + this.publisher + ", connected=" + this.connected;
|
||||||
|
}
|
||||||
|
|
||||||
// For tests only
|
// For tests only
|
||||||
protected ClusterStatusPublisher() {
|
protected ClusterStatusPublisher() {
|
||||||
master = null;
|
master = null;
|
||||||
|
@ -245,6 +251,11 @@ public class ClusterStatusPublisher extends ScheduledChore {
|
||||||
public MulticastPublisher() {
|
public MulticastPublisher() {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "channel=" + this.channel;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void connect(Configuration conf) throws IOException {
|
public void connect(Configuration conf) throws IOException {
|
||||||
String mcAddress = conf.get(HConstants.STATUS_MULTICAST_ADDRESS,
|
String mcAddress = conf.get(HConstants.STATUS_MULTICAST_ADDRESS,
|
||||||
|
@ -262,7 +273,6 @@ public class ClusterStatusPublisher extends ScheduledChore {
|
||||||
close();
|
close();
|
||||||
throw new IOException("Can't connect to " + mcAddress, e);
|
throw new IOException("Can't connect to " + mcAddress, e);
|
||||||
}
|
}
|
||||||
|
|
||||||
final InetSocketAddress isa = new InetSocketAddress(mcAddress, port);
|
final InetSocketAddress isa = new InetSocketAddress(mcAddress, port);
|
||||||
|
|
||||||
InternetProtocolFamily family;
|
InternetProtocolFamily family;
|
||||||
|
@ -285,17 +295,23 @@ public class ClusterStatusPublisher extends ScheduledChore {
|
||||||
}
|
}
|
||||||
ni = NetworkInterface.getByInetAddress(localAddress);
|
ni = NetworkInterface.getByInetAddress(localAddress);
|
||||||
}
|
}
|
||||||
|
|
||||||
Bootstrap b = new Bootstrap();
|
Bootstrap b = new Bootstrap();
|
||||||
b.group(group)
|
b.group(group)
|
||||||
.channelFactory(new HBaseDatagramChannelFactory<Channel>(NioDatagramChannel.class, family))
|
.channelFactory(new HBaseDatagramChannelFactory<Channel>(NioDatagramChannel.class, family))
|
||||||
.option(ChannelOption.SO_REUSEADDR, true)
|
.option(ChannelOption.SO_REUSEADDR, true)
|
||||||
.handler(new ClusterMetricsEncoder(isa));
|
.handler(new ClusterMetricsEncoder(isa));
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
LOG.debug("Channel bindAddress={}, networkInterface={}, INA={}", bindAddress, ni, ina);
|
||||||
channel = (DatagramChannel) b.bind(bindAddress, 0).sync().channel();
|
channel = (DatagramChannel) b.bind(bindAddress, 0).sync().channel();
|
||||||
channel.joinGroup(ina, ni, null, channel.newPromise()).sync();
|
channel.joinGroup(ina, ni, null, channel.newPromise()).sync();
|
||||||
channel.connect(isa).sync();
|
channel.connect(isa).sync();
|
||||||
|
// Set into configuration in case many networks available. Do this for tests so that
|
||||||
|
// server and client use same Interface (presuming share same Configuration).
|
||||||
|
// TestAsyncTableRSCrashPublish was failing when connected to VPN because extra networks
|
||||||
|
// available with Master binding on one Interface and client on another so test failed.
|
||||||
|
if (ni != null) {
|
||||||
|
conf.set(HConstants.STATUS_MULTICAST_NI_NAME, ni.getName());
|
||||||
|
}
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
close();
|
close();
|
||||||
throw ExceptionUtil.asInterrupt(e);
|
throw ExceptionUtil.asInterrupt(e);
|
||||||
|
@ -303,9 +319,9 @@ public class ClusterStatusPublisher extends ScheduledChore {
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final class HBaseDatagramChannelFactory<T extends Channel>
|
private static final class HBaseDatagramChannelFactory<T extends Channel>
|
||||||
implements ChannelFactory<T> {
|
implements ChannelFactory<T> {
|
||||||
private final Class<? extends T> clazz;
|
private final Class<? extends T> clazz;
|
||||||
private InternetProtocolFamily family;
|
private final InternetProtocolFamily family;
|
||||||
|
|
||||||
HBaseDatagramChannelFactory(Class<? extends T> clazz, InternetProtocolFamily family) {
|
HBaseDatagramChannelFactory(Class<? extends T> clazz, InternetProtocolFamily family) {
|
||||||
this.clazz = clazz;
|
this.clazz = clazz;
|
||||||
|
@ -347,6 +363,7 @@ public class ClusterStatusPublisher extends ScheduledChore {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void publish(ClusterMetrics cs) {
|
public void publish(ClusterMetrics cs) {
|
||||||
|
LOG.info("PUBLISH {}", cs);
|
||||||
channel.writeAndFlush(cs).syncUninterruptibly();
|
channel.writeAndFlush(cs).syncUninterruptibly();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -571,6 +571,7 @@ public class HMaster extends HRegionServer implements MasterServices {
|
||||||
" is not set - not publishing status");
|
" is not set - not publishing status");
|
||||||
} else {
|
} else {
|
||||||
clusterStatusPublisherChore = new ClusterStatusPublisher(this, conf, publisherClass);
|
clusterStatusPublisherChore = new ClusterStatusPublisher(this, conf, publisherClass);
|
||||||
|
LOG.debug("Created {}", this.clusterStatusPublisherChore);
|
||||||
getChoreService().scheduleChore(clusterStatusPublisherChore);
|
getChoreService().scheduleChore(clusterStatusPublisherChore);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue