HDFS-16603. Improve DatanodeHttpServer With Netty recommended method. (#4372)
Co-authored-by: slfan1989 <louj1988@@>
This commit is contained in:
parent
7c66266b4e
commit
e85f827f0a
|
@ -17,8 +17,8 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.datanode.web;
|
package org.apache.hadoop.hdfs.server.datanode.web;
|
||||||
|
|
||||||
import io.netty.bootstrap.ChannelFactory;
|
|
||||||
import io.netty.bootstrap.ServerBootstrap;
|
import io.netty.bootstrap.ServerBootstrap;
|
||||||
|
import io.netty.channel.ChannelFactory;
|
||||||
import io.netty.channel.ChannelFuture;
|
import io.netty.channel.ChannelFuture;
|
||||||
import io.netty.channel.ChannelHandler;
|
import io.netty.channel.ChannelHandler;
|
||||||
import io.netty.channel.ChannelInitializer;
|
import io.netty.channel.ChannelInitializer;
|
||||||
|
@ -28,6 +28,7 @@ import io.netty.channel.EventLoopGroup;
|
||||||
import io.netty.channel.nio.NioEventLoopGroup;
|
import io.netty.channel.nio.NioEventLoopGroup;
|
||||||
import io.netty.channel.socket.SocketChannel;
|
import io.netty.channel.socket.SocketChannel;
|
||||||
import io.netty.channel.socket.nio.NioServerSocketChannel;
|
import io.netty.channel.socket.nio.NioServerSocketChannel;
|
||||||
|
import io.netty.channel.WriteBufferWaterMark;
|
||||||
import io.netty.handler.codec.http.HttpRequestDecoder;
|
import io.netty.handler.codec.http.HttpRequestDecoder;
|
||||||
import io.netty.handler.codec.http.HttpResponseEncoder;
|
import io.netty.handler.codec.http.HttpResponseEncoder;
|
||||||
import io.netty.handler.ssl.SslHandler;
|
import io.netty.handler.ssl.SslHandler;
|
||||||
|
@ -168,15 +169,13 @@ public class DatanodeHttpServer implements Closeable {
|
||||||
});
|
});
|
||||||
|
|
||||||
this.httpServer.childOption(
|
this.httpServer.childOption(
|
||||||
ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK,
|
ChannelOption.WRITE_BUFFER_WATER_MARK,
|
||||||
conf.getInt(
|
new WriteBufferWaterMark(conf.getInt(
|
||||||
DFSConfigKeys.DFS_WEBHDFS_NETTY_HIGH_WATERMARK,
|
DFSConfigKeys.DFS_WEBHDFS_NETTY_LOW_WATERMARK,
|
||||||
DFSConfigKeys.DFS_WEBHDFS_NETTY_HIGH_WATERMARK_DEFAULT));
|
DFSConfigKeys.DFS_WEBHDFS_NETTY_LOW_WATERMARK_DEFAULT),
|
||||||
this.httpServer.childOption(
|
conf.getInt(
|
||||||
ChannelOption.WRITE_BUFFER_LOW_WATER_MARK,
|
DFSConfigKeys.DFS_WEBHDFS_NETTY_HIGH_WATERMARK,
|
||||||
conf.getInt(
|
DFSConfigKeys.DFS_WEBHDFS_NETTY_HIGH_WATERMARK_DEFAULT)));
|
||||||
DFSConfigKeys.DFS_WEBHDFS_NETTY_LOW_WATERMARK,
|
|
||||||
DFSConfigKeys.DFS_WEBHDFS_NETTY_LOW_WATERMARK_DEFAULT));
|
|
||||||
|
|
||||||
if (externalHttpChannel == null) {
|
if (externalHttpChannel == null) {
|
||||||
httpServer.channel(NioServerSocketChannel.class);
|
httpServer.channel(NioServerSocketChannel.class);
|
||||||
|
@ -303,18 +302,7 @@ public class DatanodeHttpServer implements Closeable {
|
||||||
public void start() throws IOException {
|
public void start() throws IOException {
|
||||||
if (httpServer != null) {
|
if (httpServer != null) {
|
||||||
InetSocketAddress infoAddr = DataNode.getInfoAddr(conf);
|
InetSocketAddress infoAddr = DataNode.getInfoAddr(conf);
|
||||||
ChannelFuture f = httpServer.bind(infoAddr);
|
httpAddress = getChannelLocalAddress(httpServer, infoAddr);
|
||||||
try {
|
|
||||||
f.syncUninterruptibly();
|
|
||||||
} catch (Throwable e) {
|
|
||||||
if (e instanceof BindException) {
|
|
||||||
throw NetUtils.wrapException(null, 0, infoAddr.getHostName(),
|
|
||||||
infoAddr.getPort(), (SocketException) e);
|
|
||||||
} else {
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
httpAddress = (InetSocketAddress) f.channel().localAddress();
|
|
||||||
LOG.info("Listening HTTP traffic on " + httpAddress);
|
LOG.info("Listening HTTP traffic on " + httpAddress);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -323,23 +311,27 @@ public class DatanodeHttpServer implements Closeable {
|
||||||
NetUtils.createSocketAddr(conf.getTrimmed(
|
NetUtils.createSocketAddr(conf.getTrimmed(
|
||||||
DFS_DATANODE_HTTPS_ADDRESS_KEY,
|
DFS_DATANODE_HTTPS_ADDRESS_KEY,
|
||||||
DFS_DATANODE_HTTPS_ADDRESS_DEFAULT));
|
DFS_DATANODE_HTTPS_ADDRESS_DEFAULT));
|
||||||
ChannelFuture f = httpsServer.bind(secInfoSocAddr);
|
httpsAddress = getChannelLocalAddress(httpsServer, secInfoSocAddr);
|
||||||
|
|
||||||
try {
|
|
||||||
f.syncUninterruptibly();
|
|
||||||
} catch (Throwable e) {
|
|
||||||
if (e instanceof BindException) {
|
|
||||||
throw NetUtils.wrapException(null, 0, secInfoSocAddr.getHostName(),
|
|
||||||
secInfoSocAddr.getPort(), (SocketException) e);
|
|
||||||
} else {
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
httpsAddress = (InetSocketAddress) f.channel().localAddress();
|
|
||||||
LOG.info("Listening HTTPS traffic on " + httpsAddress);
|
LOG.info("Listening HTTPS traffic on " + httpsAddress);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private InetSocketAddress getChannelLocalAddress(
|
||||||
|
ServerBootstrap server, InetSocketAddress address) throws IOException {
|
||||||
|
ChannelFuture f = server.bind(address);
|
||||||
|
try {
|
||||||
|
f.syncUninterruptibly();
|
||||||
|
} catch (Throwable e) {
|
||||||
|
if (e instanceof BindException) {
|
||||||
|
throw NetUtils.wrapException(null, 0, address.getHostName(),
|
||||||
|
address.getPort(), (SocketException) e);
|
||||||
|
} else {
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return (InetSocketAddress) f.channel().localAddress();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() throws IOException {
|
public void close() throws IOException {
|
||||||
bossGroup.shutdownGracefully();
|
bossGroup.shutdownGracefully();
|
||||||
|
|
|
@ -68,7 +68,7 @@ public class TestAbandonBlock {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
/** Abandon a block while creating a file */
|
/* Abandon a block while creating a file */
|
||||||
public void testAbandonBlock() throws IOException {
|
public void testAbandonBlock() throws IOException {
|
||||||
String src = FILE_NAME_PREFIX + "foo";
|
String src = FILE_NAME_PREFIX + "foo";
|
||||||
|
|
||||||
|
@ -105,7 +105,7 @@ public class TestAbandonBlock {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
/** Make sure that the quota is decremented correctly when a block is abandoned */
|
/* Make sure that the quota is decremented correctly when a block is abandoned */
|
||||||
public void testQuotaUpdatedWhenBlockAbandoned() throws IOException {
|
public void testQuotaUpdatedWhenBlockAbandoned() throws IOException {
|
||||||
// Setting diskspace quota to 3MB
|
// Setting diskspace quota to 3MB
|
||||||
fs.setQuota(new Path("/"), HdfsConstants.QUOTA_DONT_SET, 3 * 1024 * 1024);
|
fs.setQuota(new Path("/"), HdfsConstants.QUOTA_DONT_SET, 3 * 1024 * 1024);
|
||||||
|
|
Loading…
Reference in New Issue