HBASE-12956 Binding to 0.0.0.0 is broken after HBASE-10569
Signed-off-by: Enis Soztutar <enis@apache.org>
This commit is contained in:
parent
9293bf26ea
commit
fc7f53f240
|
@ -196,7 +196,7 @@ public class RpcServer implements RpcServerInterface {
|
||||||
static final ThreadLocal<MonitoredRPCHandler> MONITORED_RPC
|
static final ThreadLocal<MonitoredRPCHandler> MONITORED_RPC
|
||||||
= new ThreadLocal<MonitoredRPCHandler>();
|
= new ThreadLocal<MonitoredRPCHandler>();
|
||||||
|
|
||||||
protected final InetSocketAddress isa;
|
protected final InetSocketAddress bindAddress;
|
||||||
protected int port; // port we listen on
|
protected int port; // port we listen on
|
||||||
private int readThreads; // number of read threads
|
private int readThreads; // number of read threads
|
||||||
protected int maxIdleTime; // the maximum idle time after
|
protected int maxIdleTime; // the maximum idle time after
|
||||||
|
@ -521,8 +521,8 @@ public class RpcServer implements RpcServerInterface {
|
||||||
acceptChannel = ServerSocketChannel.open();
|
acceptChannel = ServerSocketChannel.open();
|
||||||
acceptChannel.configureBlocking(false);
|
acceptChannel.configureBlocking(false);
|
||||||
|
|
||||||
// Bind the server socket to the local host and port
|
// Bind the server socket to the binding addrees (can be different from the default interface)
|
||||||
bind(acceptChannel.socket(), isa, backlogLength);
|
bind(acceptChannel.socket(), bindAddress, backlogLength);
|
||||||
port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
|
port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
|
||||||
// create a selector;
|
// create a selector;
|
||||||
selector= Selector.open();
|
selector= Selector.open();
|
||||||
|
@ -530,7 +530,8 @@ public class RpcServer implements RpcServerInterface {
|
||||||
readers = new Reader[readThreads];
|
readers = new Reader[readThreads];
|
||||||
readPool = Executors.newFixedThreadPool(readThreads,
|
readPool = Executors.newFixedThreadPool(readThreads,
|
||||||
new ThreadFactoryBuilder().setNameFormat(
|
new ThreadFactoryBuilder().setNameFormat(
|
||||||
"RpcServer.reader=%d,port=" + port).setDaemon(true).build());
|
"RpcServer.reader=%d,bindAddress=" + bindAddress.getHostName() +
|
||||||
|
",port=" + port).setDaemon(true).build());
|
||||||
for (int i = 0; i < readThreads; ++i) {
|
for (int i = 0; i < readThreads; ++i) {
|
||||||
Reader reader = new Reader();
|
Reader reader = new Reader();
|
||||||
readers[i] = reader;
|
readers[i] = reader;
|
||||||
|
@ -1869,17 +1870,18 @@ public class RpcServer implements RpcServerInterface {
|
||||||
* instance else pass null for no authentication check.
|
* instance else pass null for no authentication check.
|
||||||
* @param name Used keying this rpc servers' metrics and for naming the Listener thread.
|
* @param name Used keying this rpc servers' metrics and for naming the Listener thread.
|
||||||
* @param services A list of services.
|
* @param services A list of services.
|
||||||
* @param isa Where to listen
|
* @param bindAddres Where to listen
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public RpcServer(final Server server, final String name,
|
public RpcServer(final Server server, final String name,
|
||||||
final List<BlockingServiceAndInterface> services,
|
final List<BlockingServiceAndInterface> services,
|
||||||
final InetSocketAddress isa, Configuration conf,
|
final InetSocketAddress bindAddress, Configuration conf,
|
||||||
RpcScheduler scheduler)
|
RpcScheduler scheduler)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
|
||||||
this.server = server;
|
this.server = server;
|
||||||
this.services = services;
|
this.services = services;
|
||||||
this.isa = isa;
|
this.bindAddress = bindAddress;
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
this.socketSendBufferSize = 0;
|
this.socketSendBufferSize = 0;
|
||||||
this.maxQueueSize =
|
this.maxQueueSize =
|
||||||
|
|
|
@ -779,6 +779,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
||||||
HConstants.DEFAULT_REGIONSERVER_PORT);
|
HConstants.DEFAULT_REGIONSERVER_PORT);
|
||||||
// Creation of a HSA will force a resolve.
|
// Creation of a HSA will force a resolve.
|
||||||
InetSocketAddress initialIsa = new InetSocketAddress(hostname, port);
|
InetSocketAddress initialIsa = new InetSocketAddress(hostname, port);
|
||||||
|
InetSocketAddress bindAddress = new InetSocketAddress(
|
||||||
|
rs.conf.get("hbase.regionserver.ipc.address", hostname), port);
|
||||||
if (initialIsa.getAddress() == null) {
|
if (initialIsa.getAddress() == null) {
|
||||||
throw new IllegalArgumentException("Failed resolve of " + initialIsa);
|
throw new IllegalArgumentException("Failed resolve of " + initialIsa);
|
||||||
}
|
}
|
||||||
|
@ -787,7 +789,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
||||||
// Set how many times to retry talking to another server over HConnection.
|
// Set how many times to retry talking to another server over HConnection.
|
||||||
ConnectionUtils.setServerSideHConnectionRetriesConfig(rs.conf, name, LOG);
|
ConnectionUtils.setServerSideHConnectionRetriesConfig(rs.conf, name, LOG);
|
||||||
rpcServer = new RpcServer(rs, name, getServices(),
|
rpcServer = new RpcServer(rs, name, getServices(),
|
||||||
initialIsa, // BindAddress is IP we got for this server.
|
bindAddress, // use final bindAddress for this server.
|
||||||
rs.conf,
|
rs.conf,
|
||||||
rpcSchedulerFactory.create(rs.conf, this, rs));
|
rpcSchedulerFactory.create(rs.conf, this, rs));
|
||||||
|
|
||||||
|
@ -798,17 +800,16 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
||||||
HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
|
HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
|
||||||
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
|
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
|
||||||
|
|
||||||
// Set our address.
|
// Set our address, however we need the final port that was given to rpcServer
|
||||||
isa = rpcServer.getListenerAddress();
|
isa = new InetSocketAddress(initialIsa.getHostName(), rpcServer.getListenerAddress().getPort());
|
||||||
rpcServer.setErrorHandler(this);
|
rpcServer.setErrorHandler(this);
|
||||||
rs.setName(name);
|
rs.setName(name);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static String getHostname(Configuration conf) throws UnknownHostException {
|
public static String getHostname(Configuration conf) throws UnknownHostException {
|
||||||
return conf.get("hbase.regionserver.ipc.address",
|
return Strings.domainNamePointerToHostName(DNS.getDefaultHost(
|
||||||
Strings.domainNamePointerToHostName(DNS.getDefaultHost(
|
|
||||||
conf.get("hbase.regionserver.dns.interface", "default"),
|
conf.get("hbase.regionserver.dns.interface", "default"),
|
||||||
conf.get("hbase.regionserver.dns.nameserver", "default"))));
|
conf.get("hbase.regionserver.dns.nameserver", "default")));
|
||||||
}
|
}
|
||||||
|
|
||||||
RegionScanner getScanner(long scannerId) {
|
RegionScanner getScanner(long scannerId) {
|
||||||
|
|
|
@ -124,6 +124,18 @@ public class TestHBaseTestingUtility {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMiniClusterBindToWildcard() throws Exception {
|
||||||
|
HBaseTestingUtility hbt = new HBaseTestingUtility();
|
||||||
|
hbt.getConfiguration().set("hbase.regionserver.ipc.address", "0.0.0.0");
|
||||||
|
MiniHBaseCluster cluster = hbt.startMiniCluster();
|
||||||
|
try {
|
||||||
|
assertEquals(1, cluster.getLiveRegionServerThreads().size());
|
||||||
|
} finally {
|
||||||
|
hbt.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test that we can start and stop multiple time a cluster
|
* Test that we can start and stop multiple time a cluster
|
||||||
* with the same HBaseTestingUtility.
|
* with the same HBaseTestingUtility.
|
||||||
|
|
Loading…
Reference in New Issue