HBASE-18307 Share the same EventLoopGroup for NettyRpcServer, NettyRpcClient and AsyncFSWALProvider at RS side

This commit is contained in:
zhangduo 2017-07-10 16:33:37 +08:00
parent 4368f09057
commit 4ab66aca89
18 changed files with 314 additions and 121 deletions

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.io.asyncfs;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.io.asyncfs;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.channel.Channel;
import io.netty.channel.EventLoop; import io.netty.channel.EventLoop;
import java.io.IOException; import java.io.IOException;
@ -54,11 +55,11 @@ public final class AsyncFSOutputHelper {
* implementation for other {@link FileSystem} which wraps around a {@link FSDataOutputStream}. * implementation for other {@link FileSystem} which wraps around a {@link FSDataOutputStream}.
*/ */
public static AsyncFSOutput createOutput(FileSystem fs, Path f, boolean overwrite, public static AsyncFSOutput createOutput(FileSystem fs, Path f, boolean overwrite,
boolean createParent, short replication, long blockSize, final EventLoop eventLoop) boolean createParent, short replication, long blockSize, EventLoop eventLoop,
throws IOException { Class<? extends Channel> channelClass) throws IOException {
if (fs instanceof DistributedFileSystem) { if (fs instanceof DistributedFileSystem) {
return FanOutOneBlockAsyncDFSOutputHelper.createOutput((DistributedFileSystem) fs, f, return FanOutOneBlockAsyncDFSOutputHelper.createOutput((DistributedFileSystem) fs, f,
overwrite, createParent, replication, blockSize, eventLoop); overwrite, createParent, replication, blockSize, eventLoop, channelClass);
} }
final FSDataOutputStream fsOut; final FSDataOutputStream fsOut;
int bufferSize = fs.getConf().getInt(CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY, int bufferSize = fs.getConf().getInt(CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,

View File

@ -26,6 +26,8 @@ import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHel
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.getStatus; import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.getStatus;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel; import io.netty.channel.Channel;
@ -37,6 +39,7 @@ import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler; import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise; import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.PromiseCombiner; import io.netty.util.concurrent.PromiseCombiner;
@ -71,8 +74,6 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
import com.google.common.annotations.VisibleForTesting;
/** /**
* An asynchronous HDFS output stream implementation which fans out data to datanode and only * An asynchronous HDFS output stream implementation which fans out data to datanode and only
* supports writing file with only one block. * supports writing file with only one block.
@ -461,8 +462,8 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
long nextSubPacketOffsetInBlock = nextPacketOffsetInBlock; long nextSubPacketOffsetInBlock = nextPacketOffsetInBlock;
for (int remaining = dataLen; remaining > 0;) { for (int remaining = dataLen; remaining > 0;) {
int toWriteDataLen = Math.min(remaining, maxDataLen); int toWriteDataLen = Math.min(remaining, maxDataLen);
combiner.add(flushBuffer(buf.readRetainedSlice(toWriteDataLen), nextSubPacketOffsetInBlock, combiner.add((Future<Void>) flushBuffer(buf.readRetainedSlice(toWriteDataLen),
syncBlock)); nextSubPacketOffsetInBlock, syncBlock));
nextSubPacketOffsetInBlock += toWriteDataLen; nextSubPacketOffsetInBlock += toWriteDataLen;
remaining -= toWriteDataLen; remaining -= toWriteDataLen;
} }

View File

@ -46,7 +46,6 @@ import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoop; import io.netty.channel.EventLoop;
import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder; import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateEvent;
@ -68,7 +67,6 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.CryptoProtocolVersion; import org.apache.hadoop.crypto.CryptoProtocolVersion;
import org.apache.hadoop.crypto.Encryptor; import org.apache.hadoop.crypto.Encryptor;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystemLinkResolver; import org.apache.hadoop.fs.FileSystemLinkResolver;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -607,7 +605,8 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
private static List<Future<Channel>> connectToDataNodes(Configuration conf, DFSClient client, private static List<Future<Channel>> connectToDataNodes(Configuration conf, DFSClient client,
String clientName, LocatedBlock locatedBlock, long maxBytesRcvd, long latestGS, String clientName, LocatedBlock locatedBlock, long maxBytesRcvd, long latestGS,
BlockConstructionStage stage, DataChecksum summer, EventLoop eventLoop) { BlockConstructionStage stage, DataChecksum summer, EventLoop eventLoop,
Class<? extends Channel> channelClass) {
Enum<?>[] storageTypes = locatedBlock.getStorageTypes(); Enum<?>[] storageTypes = locatedBlock.getStorageTypes();
DatanodeInfo[] datanodeInfos = locatedBlock.getLocations(); DatanodeInfo[] datanodeInfos = locatedBlock.getLocations();
boolean connectToDnViaHostname = boolean connectToDnViaHostname =
@ -633,7 +632,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
Promise<Channel> promise = eventLoop.newPromise(); Promise<Channel> promise = eventLoop.newPromise();
futureList.add(promise); futureList.add(promise);
String dnAddr = dnInfo.getXferAddr(connectToDnViaHostname); String dnAddr = dnInfo.getXferAddr(connectToDnViaHostname);
new Bootstrap().group(eventLoop).channel(NioSocketChannel.class) new Bootstrap().group(eventLoop).channel(channelClass)
.option(CONNECT_TIMEOUT_MILLIS, timeoutMs).handler(new ChannelInitializer<Channel>() { .option(CONNECT_TIMEOUT_MILLIS, timeoutMs).handler(new ChannelInitializer<Channel>() {
@Override @Override
@ -672,7 +671,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, String src, private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, String src,
boolean overwrite, boolean createParent, short replication, long blockSize, boolean overwrite, boolean createParent, short replication, long blockSize,
EventLoop eventLoop) throws IOException { EventLoop eventLoop, Class<? extends Channel> channelClass) throws IOException {
Configuration conf = dfs.getConf(); Configuration conf = dfs.getConf();
FSUtils fsUtils = FSUtils.getInstance(dfs, conf); FSUtils fsUtils = FSUtils.getInstance(dfs, conf);
DFSClient client = dfs.getClient(); DFSClient client = dfs.getClient();
@ -701,7 +700,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
stat.getFileId(), null); stat.getFileId(), null);
List<Channel> datanodeList = new ArrayList<>(); List<Channel> datanodeList = new ArrayList<>();
futureList = connectToDataNodes(conf, client, clientName, locatedBlock, 0L, 0L, futureList = connectToDataNodes(conf, client, clientName, locatedBlock, 0L, 0L,
PIPELINE_SETUP_CREATE, summer, eventLoop); PIPELINE_SETUP_CREATE, summer, eventLoop, channelClass);
for (Future<Channel> future : futureList) { for (Future<Channel> future : futureList) {
// fail the creation if there are connection failures since we are fail-fast. The upper // fail the creation if there are connection failures since we are fail-fast. The upper
// layer should retry itself if needed. // layer should retry itself if needed.
@ -741,14 +740,14 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
*/ */
public static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, Path f, public static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, Path f,
boolean overwrite, boolean createParent, short replication, long blockSize, boolean overwrite, boolean createParent, short replication, long blockSize,
EventLoop eventLoop) throws IOException { EventLoop eventLoop, Class<? extends Channel> channelClass) throws IOException {
return new FileSystemLinkResolver<FanOutOneBlockAsyncDFSOutput>() { return new FileSystemLinkResolver<FanOutOneBlockAsyncDFSOutput>() {
@Override @Override
public FanOutOneBlockAsyncDFSOutput doCall(Path p) public FanOutOneBlockAsyncDFSOutput doCall(Path p)
throws IOException, UnresolvedLinkException { throws IOException, UnresolvedLinkException {
return createOutput(dfs, p.toUri().getPath(), overwrite, createParent, replication, return createOutput(dfs, p.toUri().getPath(), overwrite, createParent, replication,
blockSize, eventLoop); blockSize, eventLoop, channelClass);
} }
@Override @Override

View File

@ -18,20 +18,19 @@
package org.apache.hadoop.hbase.ipc; package org.apache.hadoop.hbase.ipc;
import io.netty.bootstrap.ServerBootstrap; import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.ServerChannel;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup; import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.FixedLengthFrameDecoder; import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.GlobalEventExecutor; import io.netty.util.concurrent.GlobalEventExecutor;
import java.io.IOException; import java.io.IOException;
@ -47,11 +46,12 @@ import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.security.HBasePolicyProvider; import org.apache.hadoop.hbase.security.HBasePolicyProvider;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService; import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor; import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message; import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
import org.apache.hadoop.hbase.util.JVM; import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager; import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
@ -69,57 +69,44 @@ public class NettyRpcServer extends RpcServer {
private final Channel serverChannel; private final Channel serverChannel;
private final ChannelGroup allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); private final ChannelGroup allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
public NettyRpcServer(final Server server, final String name, public NettyRpcServer(Server server, String name, List<BlockingServiceAndInterface> services,
final List<BlockingServiceAndInterface> services, InetSocketAddress bindAddress, Configuration conf, RpcScheduler scheduler)
final InetSocketAddress bindAddress, Configuration conf, throws IOException {
RpcScheduler scheduler) throws IOException {
super(server, name, services, bindAddress, conf, scheduler); super(server, name, services, bindAddress, conf, scheduler);
this.bindAddress = bindAddress; this.bindAddress = bindAddress;
boolean useEpoll = useEpoll(conf); EventLoopGroup eventLoopGroup;
int workerCount = conf.getInt("hbase.netty.rpc.server.worker.count", Class<? extends ServerChannel> channelClass;
Runtime.getRuntime().availableProcessors() / 4); if (server instanceof HRegionServer) {
EventLoopGroup bossGroup = null; NettyEventLoopGroupConfig config = ((HRegionServer) server).getEventLoopGroupConfig();
EventLoopGroup workerGroup = null; eventLoopGroup = config.group();
if (useEpoll) { channelClass = config.serverChannelClass();
bossGroup = new EpollEventLoopGroup(1);
workerGroup = new EpollEventLoopGroup(workerCount);
} else { } else {
bossGroup = new NioEventLoopGroup(1); eventLoopGroup = new NioEventLoopGroup(0,
workerGroup = new NioEventLoopGroup(workerCount); new DefaultThreadFactory("NettyRpcServer", true, Thread.MAX_PRIORITY));
channelClass = NioServerSocketChannel.class;
} }
ServerBootstrap bootstrap = new ServerBootstrap(); ServerBootstrap bootstrap = new ServerBootstrap().group(eventLoopGroup).channel(channelClass)
bootstrap.group(bossGroup, workerGroup); .childOption(ChannelOption.TCP_NODELAY, tcpNoDelay)
if (useEpoll) { .childOption(ChannelOption.SO_KEEPALIVE, tcpKeepAlive)
bootstrap.channel(EpollServerSocketChannel.class); .childHandler(new ChannelInitializer<Channel>() {
} else {
bootstrap.channel(NioServerSocketChannel.class);
}
bootstrap.childOption(ChannelOption.TCP_NODELAY, tcpNoDelay);
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, tcpKeepAlive);
bootstrap.childOption(ChannelOption.ALLOCATOR,
PooledByteBufAllocator.DEFAULT);
bootstrap.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
FixedLengthFrameDecoder preambleDecoder = new FixedLengthFrameDecoder(6);
preambleDecoder.setSingleDecode(true);
pipeline.addLast("preambleDecoder", preambleDecoder);
pipeline.addLast("preambleHandler", new NettyRpcServerPreambleHandler(NettyRpcServer.this));
pipeline.addLast("frameDecoder",
new LengthFieldBasedFrameDecoder(maxRequestSize, 0, 4, 0, 4, true));
pipeline.addLast("decoder", new NettyRpcServerRequestDecoder(allChannels, metrics));
pipeline.addLast("encoder", new NettyRpcServerResponseEncoder(metrics));
}
});
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
FixedLengthFrameDecoder preambleDecoder = new FixedLengthFrameDecoder(6);
preambleDecoder.setSingleDecode(true);
pipeline.addLast("preambleDecoder", preambleDecoder);
pipeline.addLast("preambleHandler",
new NettyRpcServerPreambleHandler(NettyRpcServer.this));
pipeline.addLast("frameDecoder",
new LengthFieldBasedFrameDecoder(maxRequestSize, 0, 4, 0, 4, true));
pipeline.addLast("decoder", new NettyRpcServerRequestDecoder(allChannels, metrics));
pipeline.addLast("encoder", new NettyRpcServerResponseEncoder(metrics));
}
});
try { try {
serverChannel = bootstrap.bind(this.bindAddress).sync().channel(); serverChannel = bootstrap.bind(this.bindAddress).sync().channel();
LOG.info("NettyRpcServer bind to address=" + serverChannel.localAddress() LOG.info("NettyRpcServer bind to address=" + serverChannel.localAddress());
+ ", hbase.netty.rpc.server.worker.count=" + workerCount
+ ", useEpoll=" + useEpoll);
allChannels.add(serverChannel);
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new InterruptedIOException(e.getMessage()); throw new InterruptedIOException(e.getMessage());
} }
@ -127,14 +114,6 @@ public class NettyRpcServer extends RpcServer {
this.scheduler.init(new RpcSchedulerContext(this)); this.scheduler.init(new RpcSchedulerContext(this));
} }
private static boolean useEpoll(Configuration conf) {
// Config to enable native transport.
boolean epollEnabled = conf.getBoolean("hbase.rpc.server.nativetransport",
true);
// Use the faster native epoll transport mechanism on linux if enabled
return epollEnabled && JVM.isLinux() && JVM.isAmd64();
}
@Override @Override
public synchronized void start() { public synchronized void start() {
if (started) { if (started) {

View File

@ -18,6 +18,10 @@
*/ */
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.lang.Thread.UncaughtExceptionHandler; import java.lang.Thread.UncaughtExceptionHandler;
@ -36,8 +40,8 @@ import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.SortedMap; import java.util.SortedMap;
import java.util.TreeMap; import java.util.TreeMap;
@ -106,7 +110,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.util.MemorySizeUtil; import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
import org.apache.hadoop.hbase.ipc.RpcCallContext; import org.apache.hadoop.hbase.ipc.NettyRpcClientConfigHelper;
import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcClientFactory; import org.apache.hadoop.hbase.ipc.RpcClientFactory;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
@ -137,7 +141,11 @@ import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
import org.apache.hadoop.hbase.security.Superusers; import org.apache.hadoop.hbase.security.Superusers;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.*; import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingRpcChannel;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.TextFormat;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall;
@ -153,9 +161,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionServe
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockService; import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
@ -179,12 +184,14 @@ import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.hbase.util.HasThread;
import org.apache.hadoop.hbase.util.JSONBean; import org.apache.hadoop.hbase.util.JSONBean;
import org.apache.hadoop.hbase.util.JvmPauseMonitor; import org.apache.hadoop.hbase.util.JvmPauseMonitor;
import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.apache.hadoop.hbase.util.Sleeper; import org.apache.hadoop.hbase.util.Sleeper;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.util.VersionInfo; import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.NettyAsyncFSWALConfigHelper;
import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker; import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
@ -204,10 +211,6 @@ import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException; import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.data.Stat;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import sun.misc.Signal; import sun.misc.Signal;
import sun.misc.SignalHandler; import sun.misc.SignalHandler;
@ -526,6 +529,8 @@ public class HRegionServer extends HasThread implements
protected FileSystemUtilizationChore fsUtilizationChore; protected FileSystemUtilizationChore fsUtilizationChore;
private final NettyEventLoopGroupConfig eventLoopGroupConfig;
/** /**
* Starts a HRegionServer at the default location. * Starts a HRegionServer at the default location.
*/ */
@ -541,6 +546,13 @@ public class HRegionServer extends HasThread implements
super("RegionServer"); // thread name super("RegionServer"); // thread name
this.fsOk = true; this.fsOk = true;
this.conf = conf; this.conf = conf;
// initialize netty event loop group at the very beginning as we may use it to start rpc server,
// rpc client and WAL.
this.eventLoopGroupConfig = new NettyEventLoopGroupConfig(conf, "RS-EventLoopGroup");
NettyRpcClientConfigHelper.setEventLoopConfig(conf, eventLoopGroupConfig.group(),
eventLoopGroupConfig.clientChannelClass());
NettyAsyncFSWALConfigHelper.setEventLoopConfig(conf, eventLoopGroupConfig.group(),
eventLoopGroupConfig.clientChannelClass());
MemorySizeUtil.checkForClusterFreeHeapMemoryLimit(this.conf); MemorySizeUtil.checkForClusterFreeHeapMemoryLimit(this.conf);
HFile.checkHFileVersion(this.conf); HFile.checkHFileVersion(this.conf);
checkCodecs(this.conf); checkCodecs(this.conf);
@ -3740,4 +3752,8 @@ public class HRegionServer extends HasThread implements
public RegionServerSpaceQuotaManager getRegionServerSpaceQuotaManager() { public RegionServerSpaceQuotaManager getRegionServerSpaceQuotaManager() {
return this.rsSpaceQuotaManager; return this.rsSpaceQuotaManager;
} }
public NettyEventLoopGroupConfig getEventLoopGroupConfig() {
return eventLoopGroupConfig;
}
} }

View File

@ -24,6 +24,7 @@ import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.Sequence; import com.lmax.disruptor.Sequence;
import com.lmax.disruptor.Sequencer; import com.lmax.disruptor.Sequencer;
import io.netty.channel.Channel;
import io.netty.channel.EventLoop; import io.netty.channel.EventLoop;
import io.netty.util.concurrent.SingleThreadEventExecutor; import io.netty.util.concurrent.SingleThreadEventExecutor;
@ -144,6 +145,8 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
private final EventLoop eventLoop; private final EventLoop eventLoop;
private final Class<? extends Channel> channelClass;
private final Lock consumeLock = new ReentrantLock(); private final Lock consumeLock = new ReentrantLock();
private final Runnable consumer = this::consume; private final Runnable consumer = this::consume;
@ -192,10 +195,11 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
public AsyncFSWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir, public AsyncFSWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir,
Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists, Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists,
String prefix, String suffix, EventLoop eventLoop) String prefix, String suffix, EventLoop eventLoop, Class<? extends Channel> channelClass)
throws FailedLogCloseException, IOException { throws FailedLogCloseException, IOException {
super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix); super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix);
this.eventLoop = eventLoop; this.eventLoop = eventLoop;
this.channelClass = channelClass;
Supplier<Boolean> hasConsumerTask; Supplier<Boolean> hasConsumerTask;
if (eventLoop instanceof SingleThreadEventExecutor) { if (eventLoop instanceof SingleThreadEventExecutor) {
@ -607,7 +611,8 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
boolean overwrite = false; boolean overwrite = false;
for (int retry = 0;; retry++) { for (int retry = 0;; retry++) {
try { try {
return AsyncFSWALProvider.createAsyncWriter(conf, fs, path, overwrite, eventLoop); return AsyncFSWALProvider.createAsyncWriter(conf, fs, path, overwrite, eventLoop,
channelClass);
} catch (RemoteException e) { } catch (RemoteException e) {
LOG.warn("create wal log writer " + path + " failed, retry = " + retry, e); LOG.warn("create wal log writer " + path + " failed, retry = " + retry, e);
if (shouldRetryCreate(e)) { if (shouldRetryCreate(e)) {

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver.wal;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import io.netty.channel.Channel;
import io.netty.channel.EventLoop; import io.netty.channel.EventLoop;
import java.io.IOException; import java.io.IOException;
@ -54,6 +55,8 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
private final EventLoop eventLoop; private final EventLoop eventLoop;
private final Class<? extends Channel> channelClass;
private AsyncFSOutput output; private AsyncFSOutput output;
private static final class OutputStreamWrapper extends OutputStream private static final class OutputStreamWrapper extends OutputStream
@ -99,8 +102,9 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
private OutputStream asyncOutputWrapper; private OutputStream asyncOutputWrapper;
public AsyncProtobufLogWriter(EventLoop eventLoop) { public AsyncProtobufLogWriter(EventLoop eventLoop, Class<? extends Channel> channelClass) {
this.eventLoop = eventLoop; this.eventLoop = eventLoop;
this.channelClass = channelClass;
} }
@Override @Override
@ -151,7 +155,7 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize, protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize,
short replication, long blockSize) throws IOException { short replication, long blockSize) throws IOException {
this.output = AsyncFSOutputHelper.createOutput(fs, path, overwritable, false, replication, this.output = AsyncFSOutputHelper.createOutput(fs, path, overwritable, false, replication,
blockSize, eventLoop); blockSize, eventLoop, channelClass);
this.asyncOutputWrapper = new OutputStreamWrapper(output); this.asyncOutputWrapper = new OutputStreamWrapper(output);
} }

View File

@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.crypto.Encryptor; import org.apache.hadoop.hbase.io.crypto.Encryptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader;
import io.netty.channel.Channel;
import io.netty.channel.EventLoop; import io.netty.channel.EventLoop;
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
@ -32,8 +33,8 @@ public class SecureAsyncProtobufLogWriter extends AsyncProtobufLogWriter {
private Encryptor encryptor = null; private Encryptor encryptor = null;
public SecureAsyncProtobufLogWriter(EventLoop eventLoop) { public SecureAsyncProtobufLogWriter(EventLoop eventLoop, Class<? extends Channel> channelClass) {
super(eventLoop); super(eventLoop, channelClass);
} }
@Override @Override

View File

@ -0,0 +1,82 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.concurrent.ThreadFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* Event loop group related config.
*/
@InterfaceAudience.Private
public class NettyEventLoopGroupConfig {
private final EventLoopGroup group;
private final Class<? extends ServerChannel> serverChannelClass;
private final Class<? extends Channel> clientChannelClass;
private static boolean useEpoll(Configuration conf) {
// Config to enable native transport.
boolean epollEnabled = conf.getBoolean("hbase.netty.nativetransport", true);
// Use the faster native epoll transport mechanism on linux if enabled
return epollEnabled && JVM.isLinux() && JVM.isAmd64();
}
public NettyEventLoopGroupConfig(Configuration conf, String threadPoolName) {
boolean useEpoll = useEpoll(conf);
int workerCount = conf.getInt("hbase.netty.worker.count", 0);
ThreadFactory eventLoopThreadFactory =
new DefaultThreadFactory(threadPoolName, true, Thread.MAX_PRIORITY);
if (useEpoll) {
group = new EpollEventLoopGroup(workerCount, eventLoopThreadFactory);
serverChannelClass = EpollServerSocketChannel.class;
clientChannelClass = EpollSocketChannel.class;
} else {
group = new NioEventLoopGroup(workerCount, eventLoopThreadFactory);
serverChannelClass = NioServerSocketChannel.class;
clientChannelClass = NioSocketChannel.class;
}
}
public EventLoopGroup group() {
return group;
}
public Class<? extends ServerChannel> serverChannelClass() {
return serverChannelClass;
}
public Class<? extends Channel> clientChannelClass() {
return clientChannelClass;
}
}

View File

@ -19,9 +19,12 @@ package org.apache.hadoop.hbase.wal;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import io.netty.channel.Channel;
import io.netty.channel.EventLoop; import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException; import java.io.IOException;
@ -36,7 +39,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.regionserver.wal.AsyncFSWAL; import org.apache.hadoop.hbase.regionserver.wal.AsyncFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.AsyncProtobufLogWriter; import org.apache.hadoop.hbase.regionserver.wal.AsyncProtobufLogWriter;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Pair;
/** /**
* A WAL provider that use {@link AsyncFSWAL}. * A WAL provider that use {@link AsyncFSWAL}.
@ -52,31 +55,43 @@ public class AsyncFSWALProvider extends AbstractFSWALProvider<AsyncFSWAL> {
void init(FileSystem fs, Path path, Configuration c, boolean overwritable) throws IOException; void init(FileSystem fs, Path path, Configuration c, boolean overwritable) throws IOException;
} }
private EventLoopGroup eventLoopGroup = null; private EventLoopGroup eventLoopGroup;
private Class<? extends Channel> channelClass;
@Override @Override
protected AsyncFSWAL createWAL() throws IOException { protected AsyncFSWAL createWAL() throws IOException {
return new AsyncFSWAL(FSUtils.getWALFileSystem(conf), FSUtils.getWALRootDir(conf), return new AsyncFSWAL(FSUtils.getWALFileSystem(conf), FSUtils.getWALRootDir(conf),
getWALDirectoryName(factory.factoryId), HConstants.HREGION_OLDLOGDIR_NAME, conf, listeners, getWALDirectoryName(factory.factoryId), HConstants.HREGION_OLDLOGDIR_NAME, conf, listeners,
true, logPrefix, META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null, true, logPrefix, META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null,
eventLoopGroup.next()); eventLoopGroup.next(), channelClass);
} }
@Override @Override
protected void doInit(Configuration conf) throws IOException { protected void doInit(Configuration conf) throws IOException {
eventLoopGroup = new NioEventLoopGroup(1, Threads.newDaemonThreadFactory("AsyncFSWAL")); Pair<EventLoopGroup, Class<? extends Channel>> eventLoopGroupAndChannelClass =
NettyAsyncFSWALConfigHelper.getEventLoopConfig(conf);
if (eventLoopGroupAndChannelClass != null) {
eventLoopGroup = eventLoopGroupAndChannelClass.getFirst();
channelClass = eventLoopGroupAndChannelClass.getSecond();
} else {
eventLoopGroup = new NioEventLoopGroup(1,
new DefaultThreadFactory("AsyncFSWAL", true, Thread.MAX_PRIORITY));
channelClass = NioSocketChannel.class;
}
} }
/** /**
* public because of AsyncFSWAL. Should be package-private * public because of AsyncFSWAL. Should be package-private
*/ */
public static AsyncWriter createAsyncWriter(Configuration conf, FileSystem fs, Path path, public static AsyncWriter createAsyncWriter(Configuration conf, FileSystem fs, Path path,
boolean overwritable, EventLoop eventLoop) throws IOException { boolean overwritable, EventLoop eventLoop, Class<? extends Channel> channelClass)
throws IOException {
// Configuration already does caching for the Class lookup. // Configuration already does caching for the Class lookup.
Class<? extends AsyncWriter> logWriterClass = conf.getClass( Class<? extends AsyncWriter> logWriterClass = conf.getClass(
"hbase.regionserver.hlog.async.writer.impl", AsyncProtobufLogWriter.class, AsyncWriter.class); "hbase.regionserver.hlog.async.writer.impl", AsyncProtobufLogWriter.class, AsyncWriter.class);
try { try {
AsyncWriter writer = logWriterClass.getConstructor(EventLoop.class).newInstance(eventLoop); AsyncWriter writer = logWriterClass.getConstructor(EventLoop.class, Class.class)
.newInstance(eventLoop, channelClass);
writer.init(fs, path, conf, overwritable); writer.init(fs, path, conf, overwritable);
return writer; return writer;
} catch (Exception e) { } catch (Exception e) {

View File

@ -0,0 +1,63 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.wal;
import com.google.common.base.Preconditions;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.util.Pair;
/**
* Helper class for passing netty event loop config to {@link AsyncFSWALProvider}.
*/
public class NettyAsyncFSWALConfigHelper {
private static final String EVENT_LOOP_CONFIG = "hbase.wal.async.event-loop.config";
private static final String CONFIG_NAME = "global-event-loop";
private static final Map<String, Pair<EventLoopGroup, Class<? extends Channel>>> EVENT_LOOP_CONFIG_MAP =
new HashMap<>();
/**
* Set the EventLoopGroup and channel class for {@code AsyncFSWALProvider}.
*/
public static void setEventLoopConfig(Configuration conf, EventLoopGroup group,
Class<? extends Channel> channelClass) {
Preconditions.checkNotNull(group, "group is null");
Preconditions.checkNotNull(channelClass, "channel class is null");
conf.set(EVENT_LOOP_CONFIG, CONFIG_NAME);
EVENT_LOOP_CONFIG_MAP.put(CONFIG_NAME,
Pair.<EventLoopGroup, Class<? extends Channel>> newPair(group, channelClass));
}
static Pair<EventLoopGroup, Class<? extends Channel>> getEventLoopConfig(Configuration conf) {
String name = conf.get(EVENT_LOOP_CONFIG);
if (StringUtils.isBlank(name)) {
return null;
}
return EVENT_LOOP_CONFIG_MAP.get(name);
}
}

View File

@ -18,14 +18,17 @@
package org.apache.hadoop.hbase.io.asyncfs; package org.apache.hadoop.hbase.io.asyncfs;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import io.netty.channel.Channel;
import io.netty.channel.EventLoop; import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
@ -64,6 +67,8 @@ public class TestFanOutOneBlockAsyncDFSOutput {
private static EventLoopGroup EVENT_LOOP_GROUP; private static EventLoopGroup EVENT_LOOP_GROUP;
private static Class<? extends Channel> CHANNEL_CLASS;
private static int READ_TIMEOUT_MS = 2000; private static int READ_TIMEOUT_MS = 2000;
@Rule @Rule
@ -75,6 +80,7 @@ public class TestFanOutOneBlockAsyncDFSOutput {
TEST_UTIL.startMiniDFSCluster(3); TEST_UTIL.startMiniDFSCluster(3);
FS = TEST_UTIL.getDFSCluster().getFileSystem(); FS = TEST_UTIL.getDFSCluster().getFileSystem();
EVENT_LOOP_GROUP = new NioEventLoopGroup(); EVENT_LOOP_GROUP = new NioEventLoopGroup();
CHANNEL_CLASS = NioSocketChannel.class;
} }
@AfterClass @AfterClass
@ -91,9 +97,9 @@ public class TestFanOutOneBlockAsyncDFSOutput {
// will fail. // will fail.
for (;;) { for (;;) {
try { try {
FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS,
FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, new Path("/ensureDatanodeAlive"), new Path("/ensureDatanodeAlive"), true, true, (short) 3, FS.getDefaultBlockSize(),
true, true, (short) 3, FS.getDefaultBlockSize(), EVENT_LOOP_GROUP.next()); EVENT_LOOP_GROUP.next(), CHANNEL_CLASS);
out.close(); out.close();
break; break;
} catch (IOException e) { } catch (IOException e) {
@ -122,8 +128,8 @@ public class TestFanOutOneBlockAsyncDFSOutput {
public void test() throws IOException, InterruptedException, ExecutionException { public void test() throws IOException, InterruptedException, ExecutionException {
Path f = new Path("/" + name.getMethodName()); Path f = new Path("/" + name.getMethodName());
EventLoop eventLoop = EVENT_LOOP_GROUP.next(); EventLoop eventLoop = EVENT_LOOP_GROUP.next();
final FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop); false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS);
writeAndVerify(eventLoop, FS, f, out); writeAndVerify(eventLoop, FS, f, out);
} }
@ -131,8 +137,8 @@ public class TestFanOutOneBlockAsyncDFSOutput {
public void testMaxByteBufAllocated() throws Exception { public void testMaxByteBufAllocated() throws Exception {
Path f = new Path("/" + name.getMethodName()); Path f = new Path("/" + name.getMethodName());
EventLoop eventLoop = EVENT_LOOP_GROUP.next(); EventLoop eventLoop = EVENT_LOOP_GROUP.next();
final FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop); false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS);
out.guess(5 * 1024); out.guess(5 * 1024);
assertEquals(8 * 1024, out.guess(5 * 1024)); assertEquals(8 * 1024, out.guess(5 * 1024));
assertEquals(16 * 1024, out.guess(10 * 1024)); assertEquals(16 * 1024, out.guess(10 * 1024));
@ -146,9 +152,9 @@ public class TestFanOutOneBlockAsyncDFSOutput {
public void testRecover() throws IOException, InterruptedException, ExecutionException { public void testRecover() throws IOException, InterruptedException, ExecutionException {
Path f = new Path("/" + name.getMethodName()); Path f = new Path("/" + name.getMethodName());
EventLoop eventLoop = EVENT_LOOP_GROUP.next(); EventLoop eventLoop = EVENT_LOOP_GROUP.next();
final FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop); false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS);
final byte[] b = new byte[10]; byte[] b = new byte[10];
ThreadLocalRandom.current().nextBytes(b); ThreadLocalRandom.current().nextBytes(b);
out.write(b, 0, b.length); out.write(b, 0, b.length);
out.flush(false).get(); out.flush(false).get();
@ -179,8 +185,8 @@ public class TestFanOutOneBlockAsyncDFSOutput {
public void testHeartbeat() throws IOException, InterruptedException, ExecutionException { public void testHeartbeat() throws IOException, InterruptedException, ExecutionException {
Path f = new Path("/" + name.getMethodName()); Path f = new Path("/" + name.getMethodName());
EventLoop eventLoop = EVENT_LOOP_GROUP.next(); EventLoop eventLoop = EVENT_LOOP_GROUP.next();
final FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop); false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS);
Thread.sleep(READ_TIMEOUT_MS * 2); Thread.sleep(READ_TIMEOUT_MS * 2);
// the connection to datanode should still alive. // the connection to datanode should still alive.
writeAndVerify(eventLoop, FS, f, out); writeAndVerify(eventLoop, FS, f, out);
@ -195,11 +201,11 @@ public class TestFanOutOneBlockAsyncDFSOutput {
EventLoop eventLoop = EVENT_LOOP_GROUP.next(); EventLoop eventLoop = EVENT_LOOP_GROUP.next();
try { try {
FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3, FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3,
FS.getDefaultBlockSize(), eventLoop); FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS);
fail("should fail with parent does not exist"); fail("should fail with parent does not exist");
} catch (RemoteException e) { } catch (RemoteException e) {
LOG.info("expected exception caught", e); LOG.info("expected exception caught", e);
assertTrue(e.unwrapRemoteException() instanceof FileNotFoundException); assertThat(e.unwrapRemoteException(), instanceOf(FileNotFoundException.class));
} }
} }
@ -220,7 +226,7 @@ public class TestFanOutOneBlockAsyncDFSOutput {
EventLoop eventLoop = EVENT_LOOP_GROUP.next(); EventLoop eventLoop = EVENT_LOOP_GROUP.next();
try { try {
FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3, FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3,
FS.getDefaultBlockSize(), eventLoop); FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS);
fail("should fail with connection error"); fail("should fail with connection error");
} catch (IOException e) { } catch (IOException e) {
LOG.info("expected exception caught", e); LOG.info("expected exception caught", e);
@ -239,8 +245,8 @@ public class TestFanOutOneBlockAsyncDFSOutput {
public void testWriteLargeChunk() throws IOException, InterruptedException, ExecutionException { public void testWriteLargeChunk() throws IOException, InterruptedException, ExecutionException {
Path f = new Path("/" + name.getMethodName()); Path f = new Path("/" + name.getMethodName());
EventLoop eventLoop = EVENT_LOOP_GROUP.next(); EventLoop eventLoop = EVENT_LOOP_GROUP.next();
final FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
true, false, (short) 3, 1024 * 1024 * 1024, eventLoop); false, (short) 3, 1024 * 1024 * 1024, eventLoop, CHANNEL_CLASS);
byte[] b = new byte[50 * 1024 * 1024]; byte[] b = new byte[50 * 1024 * 1024];
ThreadLocalRandom.current().nextBytes(b); ThreadLocalRandom.current().nextBytes(b);
out.write(b); out.write(b);

View File

@ -20,8 +20,10 @@ package org.apache.hadoop.hbase.io.asyncfs;
import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
@ -31,8 +33,6 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseCommonTestingUtility; import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput;
import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutputHelper;
import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.AfterClass; import org.junit.AfterClass;
@ -44,6 +44,8 @@ public class TestLocalAsyncOutput {
private static EventLoopGroup GROUP = new NioEventLoopGroup(); private static EventLoopGroup GROUP = new NioEventLoopGroup();
private static Class<? extends Channel> CHANNEL_CLASS = NioSocketChannel.class;
private static final HBaseCommonTestingUtility TEST_UTIL = new HBaseCommonTestingUtility(); private static final HBaseCommonTestingUtility TEST_UTIL = new HBaseCommonTestingUtility();
@AfterClass @AfterClass
@ -57,7 +59,7 @@ public class TestLocalAsyncOutput {
Path f = new Path(TEST_UTIL.getDataTestDir(), "test"); Path f = new Path(TEST_UTIL.getDataTestDir(), "test");
FileSystem fs = FileSystem.getLocal(TEST_UTIL.getConfiguration()); FileSystem fs = FileSystem.getLocal(TEST_UTIL.getConfiguration());
AsyncFSOutput out = AsyncFSOutputHelper.createOutput(fs, f, false, true, AsyncFSOutput out = AsyncFSOutputHelper.createOutput(fs, f, false, true,
fs.getDefaultReplication(f), fs.getDefaultBlockSize(f), GROUP.next()); fs.getDefaultReplication(f), fs.getDefaultBlockSize(f), GROUP.next(), CHANNEL_CLASS);
byte[] b = new byte[10]; byte[] b = new byte[10];
ThreadLocalRandom.current().nextBytes(b); ThreadLocalRandom.current().nextBytes(b);
out.write(b); out.write(b);

View File

@ -31,9 +31,11 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIP
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY;
import io.netty.channel.Channel;
import io.netty.channel.EventLoop; import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
@ -83,6 +85,8 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput {
private static EventLoopGroup EVENT_LOOP_GROUP; private static EventLoopGroup EVENT_LOOP_GROUP;
private static Class<? extends Channel> CHANNEL_CLASS;
private static int READ_TIMEOUT_MS = 200000; private static int READ_TIMEOUT_MS = 200000;
private static final File KEYTAB_FILE = private static final File KEYTAB_FILE =
@ -166,6 +170,7 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput {
@BeforeClass @BeforeClass
public static void setUpBeforeClass() throws Exception { public static void setUpBeforeClass() throws Exception {
EVENT_LOOP_GROUP = new NioEventLoopGroup(); EVENT_LOOP_GROUP = new NioEventLoopGroup();
CHANNEL_CLASS = NioSocketChannel.class;
TEST_UTIL.getConfiguration().setInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT_MS); TEST_UTIL.getConfiguration().setInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT_MS);
KDC = TEST_UTIL.setupMiniKdc(KEYTAB_FILE); KDC = TEST_UTIL.setupMiniKdc(KEYTAB_FILE);
USERNAME = UserGroupInformation.getLoginUser().getShortUserName(); USERNAME = UserGroupInformation.getLoginUser().getShortUserName();
@ -242,8 +247,8 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput {
public void test() throws IOException, InterruptedException, ExecutionException { public void test() throws IOException, InterruptedException, ExecutionException {
Path f = getTestFile(); Path f = getTestFile();
EventLoop eventLoop = EVENT_LOOP_GROUP.next(); EventLoop eventLoop = EVENT_LOOP_GROUP.next();
final FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
true, false, (short) 1, FS.getDefaultBlockSize(), eventLoop); false, (short) 1, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS);
TestFanOutOneBlockAsyncDFSOutput.writeAndVerify(eventLoop, FS, f, out); TestFanOutOneBlockAsyncDFSOutput.writeAndVerify(eventLoop, FS, f, out);
} }
} }

View File

@ -17,8 +17,10 @@
*/ */
package org.apache.hadoop.hbase.regionserver.wal; package org.apache.hadoop.hbase.regionserver.wal;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
@ -41,9 +43,12 @@ public class TestAsyncFSWAL extends AbstractTestFSWAL {
private static EventLoopGroup GROUP; private static EventLoopGroup GROUP;
private static Class<? extends Channel> CHANNEL_CLASS;
@BeforeClass @BeforeClass
public static void setUpBeforeClass() throws Exception { public static void setUpBeforeClass() throws Exception {
GROUP = new NioEventLoopGroup(1, Threads.newDaemonThreadFactory("TestAsyncFSWAL")); GROUP = new NioEventLoopGroup(1, Threads.newDaemonThreadFactory("TestAsyncFSWAL"));
CHANNEL_CLASS = NioSocketChannel.class;
AbstractTestFSWAL.setUpBeforeClass(); AbstractTestFSWAL.setUpBeforeClass();
} }
@ -58,7 +63,7 @@ public class TestAsyncFSWAL extends AbstractTestFSWAL {
Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists, Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists,
String prefix, String suffix) throws IOException { String prefix, String suffix) throws IOException {
return new AsyncFSWAL(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, return new AsyncFSWAL(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix,
suffix, GROUP.next()); suffix, GROUP.next(), CHANNEL_CLASS);
} }
@Override @Override
@ -67,7 +72,7 @@ public class TestAsyncFSWAL extends AbstractTestFSWAL {
boolean failIfWALExists, String prefix, String suffix, final Runnable action) boolean failIfWALExists, String prefix, String suffix, final Runnable action)
throws IOException { throws IOException {
return new AsyncFSWAL(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, return new AsyncFSWAL(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix,
suffix, GROUP.next()) { suffix, GROUP.next(), CHANNEL_CLASS) {
@Override @Override
void atHeadOfRingBufferEventHandlerAppend() { void atHeadOfRingBufferEventHandlerAppend() {

View File

@ -19,8 +19,10 @@ package org.apache.hadoop.hbase.regionserver.wal;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
@ -42,9 +44,12 @@ public class TestAsyncProtobufLog extends AbstractTestProtobufLog<WALProvider.As
private static EventLoopGroup EVENT_LOOP_GROUP; private static EventLoopGroup EVENT_LOOP_GROUP;
private static Class<? extends Channel> CHANNEL_CLASS;
@BeforeClass @BeforeClass
public static void setUpBeforeClass() throws Exception { public static void setUpBeforeClass() throws Exception {
EVENT_LOOP_GROUP = new NioEventLoopGroup(); EVENT_LOOP_GROUP = new NioEventLoopGroup();
CHANNEL_CLASS = NioSocketChannel.class;
AbstractTestProtobufLog.setUpBeforeClass(); AbstractTestProtobufLog.setUpBeforeClass();
} }
@ -57,7 +62,7 @@ public class TestAsyncProtobufLog extends AbstractTestProtobufLog<WALProvider.As
@Override @Override
protected AsyncWriter createWriter(Path path) throws IOException { protected AsyncWriter createWriter(Path path) throws IOException {
return AsyncFSWALProvider.createAsyncWriter(TEST_UTIL.getConfiguration(), fs, path, false, return AsyncFSWALProvider.createAsyncWriter(TEST_UTIL.getConfiguration(), fs, path, false,
EVENT_LOOP_GROUP.next()); EVENT_LOOP_GROUP.next(), CHANNEL_CLASS);
} }
@Override @Override

View File

@ -17,8 +17,10 @@
*/ */
package org.apache.hadoop.hbase.regionserver.wal; package org.apache.hadoop.hbase.regionserver.wal;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.io.IOException; import java.io.IOException;
@ -40,9 +42,12 @@ public class TestAsyncWALReplay extends AbstractTestWALReplay {
private static EventLoopGroup GROUP; private static EventLoopGroup GROUP;
private static Class<? extends Channel> CHANNEL_CLASS;
@BeforeClass @BeforeClass
public static void setUpBeforeClass() throws Exception { public static void setUpBeforeClass() throws Exception {
GROUP = new NioEventLoopGroup(1, Threads.newDaemonThreadFactory("TestAsyncWALReplay")); GROUP = new NioEventLoopGroup(1, Threads.newDaemonThreadFactory("TestAsyncWALReplay"));
CHANNEL_CLASS = NioSocketChannel.class;
Configuration conf = AbstractTestWALReplay.TEST_UTIL.getConfiguration(); Configuration conf = AbstractTestWALReplay.TEST_UTIL.getConfiguration();
conf.set(WALFactory.WAL_PROVIDER, "asyncfs"); conf.set(WALFactory.WAL_PROVIDER, "asyncfs");
AbstractTestWALReplay.setUpBeforeClass(); AbstractTestWALReplay.setUpBeforeClass();
@ -57,6 +62,6 @@ public class TestAsyncWALReplay extends AbstractTestWALReplay {
@Override @Override
protected WAL createWAL(Configuration c, Path hbaseRootDir, String logName) throws IOException { protected WAL createWAL(Configuration c, Path hbaseRootDir, String logName) throws IOException {
return new AsyncFSWAL(FileSystem.get(c), hbaseRootDir, logName, return new AsyncFSWAL(FileSystem.get(c), hbaseRootDir, logName,
HConstants.HREGION_OLDLOGDIR_NAME, c, null, true, null, null, GROUP.next()); HConstants.HREGION_OLDLOGDIR_NAME, c, null, true, null, null, GROUP.next(), CHANNEL_CLASS);
} }
} }