HBASE-18307 Share the same EventLoopGroup for NettyRpcServer, NettyRpcClient and AsyncFSWALProvider at RS side

This commit is contained in:
zhangduo 2017-07-10 16:33:37 +08:00
parent 1ddcc07d65
commit 351703455a
18 changed files with 314 additions and 121 deletions

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.io.asyncfs;
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.hbase.classification.InterfaceAudience;

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.io.asyncfs;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
import java.io.IOException;
@ -54,11 +55,11 @@ public final class AsyncFSOutputHelper {
* implementation for other {@link FileSystem} which wraps around a {@link FSDataOutputStream}.
*/
public static AsyncFSOutput createOutput(FileSystem fs, Path f, boolean overwrite,
boolean createParent, short replication, long blockSize, final EventLoop eventLoop)
throws IOException {
boolean createParent, short replication, long blockSize, EventLoop eventLoop,
Class<? extends Channel> channelClass) throws IOException {
if (fs instanceof DistributedFileSystem) {
return FanOutOneBlockAsyncDFSOutputHelper.createOutput((DistributedFileSystem) fs, f,
overwrite, createParent, replication, blockSize, eventLoop);
overwrite, createParent, replication, blockSize, eventLoop, channelClass);
}
final FSDataOutputStream fsOut;
int bufferSize = fs.getConf().getInt(CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,

View File

@ -26,6 +26,8 @@ import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHel
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.getStatus;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
@ -37,6 +39,7 @@ import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.PromiseCombiner;
@ -71,8 +74,6 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.util.DataChecksum;
import com.google.common.annotations.VisibleForTesting;
/**
* An asynchronous HDFS output stream implementation which fans out data to datanode and only
* supports writing file with only one block.
@ -461,8 +462,8 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
long nextSubPacketOffsetInBlock = nextPacketOffsetInBlock;
for (int remaining = dataLen; remaining > 0;) {
int toWriteDataLen = Math.min(remaining, maxDataLen);
combiner.add(flushBuffer(buf.readRetainedSlice(toWriteDataLen), nextSubPacketOffsetInBlock,
syncBlock));
combiner.add((Future<Void>) flushBuffer(buf.readRetainedSlice(toWriteDataLen),
nextSubPacketOffsetInBlock, syncBlock));
nextSubPacketOffsetInBlock += toWriteDataLen;
remaining -= toWriteDataLen;
}

View File

@ -46,7 +46,6 @@ import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoop;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.timeout.IdleStateEvent;
@ -68,7 +67,6 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.CryptoProtocolVersion;
import org.apache.hadoop.crypto.Encryptor;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystemLinkResolver;
import org.apache.hadoop.fs.Path;
@ -607,7 +605,8 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
private static List<Future<Channel>> connectToDataNodes(Configuration conf, DFSClient client,
String clientName, LocatedBlock locatedBlock, long maxBytesRcvd, long latestGS,
BlockConstructionStage stage, DataChecksum summer, EventLoop eventLoop) {
BlockConstructionStage stage, DataChecksum summer, EventLoop eventLoop,
Class<? extends Channel> channelClass) {
Enum<?>[] storageTypes = locatedBlock.getStorageTypes();
DatanodeInfo[] datanodeInfos = locatedBlock.getLocations();
boolean connectToDnViaHostname =
@ -633,7 +632,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
Promise<Channel> promise = eventLoop.newPromise();
futureList.add(promise);
String dnAddr = dnInfo.getXferAddr(connectToDnViaHostname);
new Bootstrap().group(eventLoop).channel(NioSocketChannel.class)
new Bootstrap().group(eventLoop).channel(channelClass)
.option(CONNECT_TIMEOUT_MILLIS, timeoutMs).handler(new ChannelInitializer<Channel>() {
@Override
@ -672,7 +671,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, String src,
boolean overwrite, boolean createParent, short replication, long blockSize,
EventLoop eventLoop) throws IOException {
EventLoop eventLoop, Class<? extends Channel> channelClass) throws IOException {
Configuration conf = dfs.getConf();
FSUtils fsUtils = FSUtils.getInstance(dfs, conf);
DFSClient client = dfs.getClient();
@ -701,7 +700,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
stat.getFileId(), null);
List<Channel> datanodeList = new ArrayList<>();
futureList = connectToDataNodes(conf, client, clientName, locatedBlock, 0L, 0L,
PIPELINE_SETUP_CREATE, summer, eventLoop);
PIPELINE_SETUP_CREATE, summer, eventLoop, channelClass);
for (Future<Channel> future : futureList) {
// fail the creation if there are connection failures since we are fail-fast. The upper
// layer should retry itself if needed.
@ -741,14 +740,14 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
*/
public static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, Path f,
boolean overwrite, boolean createParent, short replication, long blockSize,
EventLoop eventLoop) throws IOException {
EventLoop eventLoop, Class<? extends Channel> channelClass) throws IOException {
return new FileSystemLinkResolver<FanOutOneBlockAsyncDFSOutput>() {
@Override
public FanOutOneBlockAsyncDFSOutput doCall(Path p)
throws IOException, UnresolvedLinkException {
return createOutput(dfs, p.toUri().getPath(), overwrite, createParent, replication,
blockSize, eventLoop);
blockSize, eventLoop, channelClass);
}
@Override

View File

@ -18,20 +18,19 @@
package org.apache.hadoop.hbase.ipc;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.ServerChannel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.io.IOException;
@ -47,11 +46,12 @@ import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.security.HBasePolicyProvider;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
import org.apache.hadoop.hbase.util.JVM;
import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
@ -69,57 +69,44 @@ public class NettyRpcServer extends RpcServer {
private final Channel serverChannel;
private final ChannelGroup allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
public NettyRpcServer(final Server server, final String name,
final List<BlockingServiceAndInterface> services,
final InetSocketAddress bindAddress, Configuration conf,
RpcScheduler scheduler) throws IOException {
public NettyRpcServer(Server server, String name, List<BlockingServiceAndInterface> services,
InetSocketAddress bindAddress, Configuration conf, RpcScheduler scheduler)
throws IOException {
super(server, name, services, bindAddress, conf, scheduler);
this.bindAddress = bindAddress;
boolean useEpoll = useEpoll(conf);
int workerCount = conf.getInt("hbase.netty.rpc.server.worker.count",
Runtime.getRuntime().availableProcessors() / 4);
EventLoopGroup bossGroup = null;
EventLoopGroup workerGroup = null;
if (useEpoll) {
bossGroup = new EpollEventLoopGroup(1);
workerGroup = new EpollEventLoopGroup(workerCount);
EventLoopGroup eventLoopGroup;
Class<? extends ServerChannel> channelClass;
if (server instanceof HRegionServer) {
NettyEventLoopGroupConfig config = ((HRegionServer) server).getEventLoopGroupConfig();
eventLoopGroup = config.group();
channelClass = config.serverChannelClass();
} else {
bossGroup = new NioEventLoopGroup(1);
workerGroup = new NioEventLoopGroup(workerCount);
eventLoopGroup = new NioEventLoopGroup(0,
new DefaultThreadFactory("NettyRpcServer", true, Thread.MAX_PRIORITY));
channelClass = NioServerSocketChannel.class;
}
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup);
if (useEpoll) {
bootstrap.channel(EpollServerSocketChannel.class);
} else {
bootstrap.channel(NioServerSocketChannel.class);
}
bootstrap.childOption(ChannelOption.TCP_NODELAY, tcpNoDelay);
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, tcpKeepAlive);
bootstrap.childOption(ChannelOption.ALLOCATOR,
PooledByteBufAllocator.DEFAULT);
bootstrap.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
FixedLengthFrameDecoder preambleDecoder = new FixedLengthFrameDecoder(6);
preambleDecoder.setSingleDecode(true);
pipeline.addLast("preambleDecoder", preambleDecoder);
pipeline.addLast("preambleHandler", new NettyRpcServerPreambleHandler(NettyRpcServer.this));
pipeline.addLast("frameDecoder",
new LengthFieldBasedFrameDecoder(maxRequestSize, 0, 4, 0, 4, true));
pipeline.addLast("decoder", new NettyRpcServerRequestDecoder(allChannels, metrics));
pipeline.addLast("encoder", new NettyRpcServerResponseEncoder(metrics));
}
});
ServerBootstrap bootstrap = new ServerBootstrap().group(eventLoopGroup).channel(channelClass)
.childOption(ChannelOption.TCP_NODELAY, tcpNoDelay)
.childOption(ChannelOption.SO_KEEPALIVE, tcpKeepAlive)
.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
FixedLengthFrameDecoder preambleDecoder = new FixedLengthFrameDecoder(6);
preambleDecoder.setSingleDecode(true);
pipeline.addLast("preambleDecoder", preambleDecoder);
pipeline.addLast("preambleHandler",
new NettyRpcServerPreambleHandler(NettyRpcServer.this));
pipeline.addLast("frameDecoder",
new LengthFieldBasedFrameDecoder(maxRequestSize, 0, 4, 0, 4, true));
pipeline.addLast("decoder", new NettyRpcServerRequestDecoder(allChannels, metrics));
pipeline.addLast("encoder", new NettyRpcServerResponseEncoder(metrics));
}
});
try {
serverChannel = bootstrap.bind(this.bindAddress).sync().channel();
LOG.info("NettyRpcServer bind to address=" + serverChannel.localAddress()
+ ", hbase.netty.rpc.server.worker.count=" + workerCount
+ ", useEpoll=" + useEpoll);
allChannels.add(serverChannel);
LOG.info("NettyRpcServer bind to address=" + serverChannel.localAddress());
} catch (InterruptedException e) {
throw new InterruptedIOException(e.getMessage());
}
@ -127,14 +114,6 @@ public class NettyRpcServer extends RpcServer {
this.scheduler.init(new RpcSchedulerContext(this));
}
private static boolean useEpoll(Configuration conf) {
// Config to enable native transport.
boolean epollEnabled = conf.getBoolean("hbase.rpc.server.nativetransport",
true);
// Use the faster native epoll transport mechanism on linux if enabled
return epollEnabled && JVM.isLinux() && JVM.isAmd64();
}
@Override
public synchronized void start() {
if (started) {

View File

@ -18,6 +18,10 @@
*/
package org.apache.hadoop.hbase.regionserver;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.lang.Thread.UncaughtExceptionHandler;
@ -36,8 +40,8 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
@ -106,7 +110,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
import org.apache.hadoop.hbase.ipc.RpcCallContext;
import org.apache.hadoop.hbase.ipc.NettyRpcClientConfigHelper;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
@ -137,7 +141,11 @@ import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
import org.apache.hadoop.hbase.security.Superusers;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.*;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingRpcChannel;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.TextFormat;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall;
@ -153,9 +161,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionServe
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
@ -179,12 +184,14 @@ import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HasThread;
import org.apache.hadoop.hbase.util.JSONBean;
import org.apache.hadoop.hbase.util.JvmPauseMonitor;
import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.apache.hadoop.hbase.util.Sleeper;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.NettyAsyncFSWALConfigHelper;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
@ -204,10 +211,6 @@ import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.data.Stat;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import sun.misc.Signal;
import sun.misc.SignalHandler;
@ -526,6 +529,8 @@ public class HRegionServer extends HasThread implements
protected FileSystemUtilizationChore fsUtilizationChore;
private final NettyEventLoopGroupConfig eventLoopGroupConfig;
/**
* Starts a HRegionServer at the default location.
*/
@ -541,6 +546,13 @@ public class HRegionServer extends HasThread implements
super("RegionServer"); // thread name
this.fsOk = true;
this.conf = conf;
// initialize netty event loop group at the very beginning as we may use it to start rpc server,
// rpc client and WAL.
this.eventLoopGroupConfig = new NettyEventLoopGroupConfig(conf, "RS-EventLoopGroup");
NettyRpcClientConfigHelper.setEventLoopConfig(conf, eventLoopGroupConfig.group(),
eventLoopGroupConfig.clientChannelClass());
NettyAsyncFSWALConfigHelper.setEventLoopConfig(conf, eventLoopGroupConfig.group(),
eventLoopGroupConfig.clientChannelClass());
MemorySizeUtil.checkForClusterFreeHeapMemoryLimit(this.conf);
HFile.checkHFileVersion(this.conf);
checkCodecs(this.conf);
@ -3740,4 +3752,8 @@ public class HRegionServer extends HasThread implements
public RegionServerSpaceQuotaManager getRegionServerSpaceQuotaManager() {
return this.rsSpaceQuotaManager;
}
public NettyEventLoopGroupConfig getEventLoopGroupConfig() {
return eventLoopGroupConfig;
}
}

View File

@ -24,6 +24,7 @@ import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.Sequence;
import com.lmax.disruptor.Sequencer;
import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
import io.netty.util.concurrent.SingleThreadEventExecutor;
@ -144,6 +145,8 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
private final EventLoop eventLoop;
private final Class<? extends Channel> channelClass;
private final Lock consumeLock = new ReentrantLock();
private final Runnable consumer = this::consume;
@ -192,10 +195,11 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
public AsyncFSWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir,
Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists,
String prefix, String suffix, EventLoop eventLoop)
String prefix, String suffix, EventLoop eventLoop, Class<? extends Channel> channelClass)
throws FailedLogCloseException, IOException {
super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix);
this.eventLoop = eventLoop;
this.channelClass = channelClass;
Supplier<Boolean> hasConsumerTask;
if (eventLoop instanceof SingleThreadEventExecutor) {
@ -607,7 +611,8 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
boolean overwrite = false;
for (int retry = 0;; retry++) {
try {
return AsyncFSWALProvider.createAsyncWriter(conf, fs, path, overwrite, eventLoop);
return AsyncFSWALProvider.createAsyncWriter(conf, fs, path, overwrite, eventLoop,
channelClass);
} catch (RemoteException e) {
LOG.warn("create wal log writer " + path + " failed, retry = " + retry, e);
if (shouldRetryCreate(e)) {

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver.wal;
import com.google.common.base.Throwables;
import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
import java.io.IOException;
@ -54,6 +55,8 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
private final EventLoop eventLoop;
private final Class<? extends Channel> channelClass;
private AsyncFSOutput output;
private static final class OutputStreamWrapper extends OutputStream
@ -99,8 +102,9 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
private OutputStream asyncOutputWrapper;
public AsyncProtobufLogWriter(EventLoop eventLoop) {
public AsyncProtobufLogWriter(EventLoop eventLoop, Class<? extends Channel> channelClass) {
this.eventLoop = eventLoop;
this.channelClass = channelClass;
}
@Override
@ -151,7 +155,7 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize,
short replication, long blockSize) throws IOException {
this.output = AsyncFSOutputHelper.createOutput(fs, path, overwritable, false, replication,
blockSize, eventLoop);
blockSize, eventLoop, channelClass);
this.asyncOutputWrapper = new OutputStreamWrapper(output);
}

View File

@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.crypto.Encryptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader;
import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
@ -32,8 +33,8 @@ public class SecureAsyncProtobufLogWriter extends AsyncProtobufLogWriter {
private Encryptor encryptor = null;
public SecureAsyncProtobufLogWriter(EventLoop eventLoop) {
super(eventLoop);
public SecureAsyncProtobufLogWriter(EventLoop eventLoop, Class<? extends Channel> channelClass) {
super(eventLoop, channelClass);
}
@Override

View File

@ -0,0 +1,82 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.concurrent.ThreadFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* Event loop group related config.
*/
@InterfaceAudience.Private
public class NettyEventLoopGroupConfig {
private final EventLoopGroup group;
private final Class<? extends ServerChannel> serverChannelClass;
private final Class<? extends Channel> clientChannelClass;
private static boolean useEpoll(Configuration conf) {
// Config to enable native transport.
boolean epollEnabled = conf.getBoolean("hbase.netty.nativetransport", true);
// Use the faster native epoll transport mechanism on linux if enabled
return epollEnabled && JVM.isLinux() && JVM.isAmd64();
}
public NettyEventLoopGroupConfig(Configuration conf, String threadPoolName) {
boolean useEpoll = useEpoll(conf);
int workerCount = conf.getInt("hbase.netty.worker.count", 0);
ThreadFactory eventLoopThreadFactory =
new DefaultThreadFactory(threadPoolName, true, Thread.MAX_PRIORITY);
if (useEpoll) {
group = new EpollEventLoopGroup(workerCount, eventLoopThreadFactory);
serverChannelClass = EpollServerSocketChannel.class;
clientChannelClass = EpollSocketChannel.class;
} else {
group = new NioEventLoopGroup(workerCount, eventLoopThreadFactory);
serverChannelClass = NioServerSocketChannel.class;
clientChannelClass = NioSocketChannel.class;
}
}
public EventLoopGroup group() {
return group;
}
public Class<? extends ServerChannel> serverChannelClass() {
return serverChannelClass;
}
public Class<? extends Channel> clientChannelClass() {
return clientChannelClass;
}
}

View File

@ -19,9 +19,12 @@ package org.apache.hadoop.hbase.wal;
import com.google.common.base.Throwables;
import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
@ -36,7 +39,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.regionserver.wal.AsyncFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.AsyncProtobufLogWriter;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.util.Pair;
/**
* A WAL provider that use {@link AsyncFSWAL}.
@ -52,31 +55,43 @@ public class AsyncFSWALProvider extends AbstractFSWALProvider<AsyncFSWAL> {
void init(FileSystem fs, Path path, Configuration c, boolean overwritable) throws IOException;
}
private EventLoopGroup eventLoopGroup = null;
private EventLoopGroup eventLoopGroup;
private Class<? extends Channel> channelClass;
@Override
protected AsyncFSWAL createWAL() throws IOException {
return new AsyncFSWAL(FSUtils.getWALFileSystem(conf), FSUtils.getWALRootDir(conf),
getWALDirectoryName(factory.factoryId), HConstants.HREGION_OLDLOGDIR_NAME, conf, listeners,
true, logPrefix, META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null,
eventLoopGroup.next());
eventLoopGroup.next(), channelClass);
}
@Override
protected void doInit(Configuration conf) throws IOException {
eventLoopGroup = new NioEventLoopGroup(1, Threads.newDaemonThreadFactory("AsyncFSWAL"));
Pair<EventLoopGroup, Class<? extends Channel>> eventLoopGroupAndChannelClass =
NettyAsyncFSWALConfigHelper.getEventLoopConfig(conf);
if (eventLoopGroupAndChannelClass != null) {
eventLoopGroup = eventLoopGroupAndChannelClass.getFirst();
channelClass = eventLoopGroupAndChannelClass.getSecond();
} else {
eventLoopGroup = new NioEventLoopGroup(1,
new DefaultThreadFactory("AsyncFSWAL", true, Thread.MAX_PRIORITY));
channelClass = NioSocketChannel.class;
}
}
/**
* public because of AsyncFSWAL. Should be package-private
*/
public static AsyncWriter createAsyncWriter(Configuration conf, FileSystem fs, Path path,
boolean overwritable, EventLoop eventLoop) throws IOException {
boolean overwritable, EventLoop eventLoop, Class<? extends Channel> channelClass)
throws IOException {
// Configuration already does caching for the Class lookup.
Class<? extends AsyncWriter> logWriterClass = conf.getClass(
"hbase.regionserver.hlog.async.writer.impl", AsyncProtobufLogWriter.class, AsyncWriter.class);
try {
AsyncWriter writer = logWriterClass.getConstructor(EventLoop.class).newInstance(eventLoop);
AsyncWriter writer = logWriterClass.getConstructor(EventLoop.class, Class.class)
.newInstance(eventLoop, channelClass);
writer.init(fs, path, conf, overwritable);
return writer;
} catch (Exception e) {

View File

@ -0,0 +1,63 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.wal;
import com.google.common.base.Preconditions;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.util.Pair;
/**
* Helper class for passing netty event loop config to {@link AsyncFSWALProvider}.
*/
public class NettyAsyncFSWALConfigHelper {
private static final String EVENT_LOOP_CONFIG = "hbase.wal.async.event-loop.config";
private static final String CONFIG_NAME = "global-event-loop";
private static final Map<String, Pair<EventLoopGroup, Class<? extends Channel>>> EVENT_LOOP_CONFIG_MAP =
new HashMap<>();
/**
* Set the EventLoopGroup and channel class for {@code AsyncFSWALProvider}.
*/
public static void setEventLoopConfig(Configuration conf, EventLoopGroup group,
Class<? extends Channel> channelClass) {
Preconditions.checkNotNull(group, "group is null");
Preconditions.checkNotNull(channelClass, "channel class is null");
conf.set(EVENT_LOOP_CONFIG, CONFIG_NAME);
EVENT_LOOP_CONFIG_MAP.put(CONFIG_NAME,
Pair.<EventLoopGroup, Class<? extends Channel>> newPair(group, channelClass));
}
static Pair<EventLoopGroup, Class<? extends Channel>> getEventLoopConfig(Configuration conf) {
String name = conf.get(EVENT_LOOP_CONFIG);
if (StringUtils.isBlank(name)) {
return null;
}
return EVENT_LOOP_CONFIG_MAP.get(name);
}
}

View File

@ -18,14 +18,17 @@
package org.apache.hadoop.hbase.io.asyncfs;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.io.FileNotFoundException;
import java.io.IOException;
@ -64,6 +67,8 @@ public class TestFanOutOneBlockAsyncDFSOutput {
private static EventLoopGroup EVENT_LOOP_GROUP;
private static Class<? extends Channel> CHANNEL_CLASS;
private static int READ_TIMEOUT_MS = 2000;
@Rule
@ -75,6 +80,7 @@ public class TestFanOutOneBlockAsyncDFSOutput {
TEST_UTIL.startMiniDFSCluster(3);
FS = TEST_UTIL.getDFSCluster().getFileSystem();
EVENT_LOOP_GROUP = new NioEventLoopGroup();
CHANNEL_CLASS = NioSocketChannel.class;
}
@AfterClass
@ -91,9 +97,9 @@ public class TestFanOutOneBlockAsyncDFSOutput {
// will fail.
for (;;) {
try {
FanOutOneBlockAsyncDFSOutput out =
FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, new Path("/ensureDatanodeAlive"),
true, true, (short) 3, FS.getDefaultBlockSize(), EVENT_LOOP_GROUP.next());
FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS,
new Path("/ensureDatanodeAlive"), true, true, (short) 3, FS.getDefaultBlockSize(),
EVENT_LOOP_GROUP.next(), CHANNEL_CLASS);
out.close();
break;
} catch (IOException e) {
@ -122,8 +128,8 @@ public class TestFanOutOneBlockAsyncDFSOutput {
public void test() throws IOException, InterruptedException, ExecutionException {
Path f = new Path("/" + name.getMethodName());
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
final FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f,
true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop);
FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS);
writeAndVerify(eventLoop, FS, f, out);
}
@ -131,8 +137,8 @@ public class TestFanOutOneBlockAsyncDFSOutput {
public void testMaxByteBufAllocated() throws Exception {
Path f = new Path("/" + name.getMethodName());
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
final FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f,
true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop);
FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS);
out.guess(5 * 1024);
assertEquals(8 * 1024, out.guess(5 * 1024));
assertEquals(16 * 1024, out.guess(10 * 1024));
@ -146,9 +152,9 @@ public class TestFanOutOneBlockAsyncDFSOutput {
public void testRecover() throws IOException, InterruptedException, ExecutionException {
Path f = new Path("/" + name.getMethodName());
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
final FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f,
true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop);
final byte[] b = new byte[10];
FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS);
byte[] b = new byte[10];
ThreadLocalRandom.current().nextBytes(b);
out.write(b, 0, b.length);
out.flush(false).get();
@ -179,8 +185,8 @@ public class TestFanOutOneBlockAsyncDFSOutput {
public void testHeartbeat() throws IOException, InterruptedException, ExecutionException {
Path f = new Path("/" + name.getMethodName());
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
final FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f,
true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop);
FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS);
Thread.sleep(READ_TIMEOUT_MS * 2);
// the connection to datanode should still alive.
writeAndVerify(eventLoop, FS, f, out);
@ -195,11 +201,11 @@ public class TestFanOutOneBlockAsyncDFSOutput {
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
try {
FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3,
FS.getDefaultBlockSize(), eventLoop);
FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS);
fail("should fail with parent does not exist");
} catch (RemoteException e) {
LOG.info("expected exception caught", e);
assertTrue(e.unwrapRemoteException() instanceof FileNotFoundException);
assertThat(e.unwrapRemoteException(), instanceOf(FileNotFoundException.class));
}
}
@ -220,7 +226,7 @@ public class TestFanOutOneBlockAsyncDFSOutput {
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
try {
FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3,
FS.getDefaultBlockSize(), eventLoop);
FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS);
fail("should fail with connection error");
} catch (IOException e) {
LOG.info("expected exception caught", e);
@ -239,8 +245,8 @@ public class TestFanOutOneBlockAsyncDFSOutput {
public void testWriteLargeChunk() throws IOException, InterruptedException, ExecutionException {
Path f = new Path("/" + name.getMethodName());
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
final FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f,
true, false, (short) 3, 1024 * 1024 * 1024, eventLoop);
FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
false, (short) 3, 1024 * 1024 * 1024, eventLoop, CHANNEL_CLASS);
byte[] b = new byte[50 * 1024 * 1024];
ThreadLocalRandom.current().nextBytes(b);
out.write(b);

View File

@ -20,8 +20,10 @@ package org.apache.hadoop.hbase.io.asyncfs;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
@ -31,8 +33,6 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput;
import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutputHelper;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.AfterClass;
@ -44,6 +44,8 @@ public class TestLocalAsyncOutput {
private static EventLoopGroup GROUP = new NioEventLoopGroup();
private static Class<? extends Channel> CHANNEL_CLASS = NioSocketChannel.class;
private static final HBaseCommonTestingUtility TEST_UTIL = new HBaseCommonTestingUtility();
@AfterClass
@ -57,7 +59,7 @@ public class TestLocalAsyncOutput {
Path f = new Path(TEST_UTIL.getDataTestDir(), "test");
FileSystem fs = FileSystem.getLocal(TEST_UTIL.getConfiguration());
AsyncFSOutput out = AsyncFSOutputHelper.createOutput(fs, f, false, true,
fs.getDefaultReplication(f), fs.getDefaultBlockSize(f), GROUP.next());
fs.getDefaultReplication(f), fs.getDefaultBlockSize(f), GROUP.next(), CHANNEL_CLASS);
byte[] b = new byte[10];
ThreadLocalRandom.current().nextBytes(b);
out.write(b);

View File

@ -31,9 +31,11 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIP
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY;
import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.io.File;
import java.io.IOException;
@ -83,6 +85,8 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput {
private static EventLoopGroup EVENT_LOOP_GROUP;
private static Class<? extends Channel> CHANNEL_CLASS;
private static int READ_TIMEOUT_MS = 200000;
private static final File KEYTAB_FILE =
@ -166,6 +170,7 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
EVENT_LOOP_GROUP = new NioEventLoopGroup();
CHANNEL_CLASS = NioSocketChannel.class;
TEST_UTIL.getConfiguration().setInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT_MS);
KDC = TEST_UTIL.setupMiniKdc(KEYTAB_FILE);
USERNAME = UserGroupInformation.getLoginUser().getShortUserName();
@ -242,8 +247,8 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput {
public void test() throws IOException, InterruptedException, ExecutionException {
Path f = getTestFile();
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
final FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f,
true, false, (short) 1, FS.getDefaultBlockSize(), eventLoop);
FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
false, (short) 1, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS);
TestFanOutOneBlockAsyncDFSOutput.writeAndVerify(eventLoop, FS, f, out);
}
}

View File

@ -17,8 +17,10 @@
*/
package org.apache.hadoop.hbase.regionserver.wal;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.io.IOException;
import java.util.List;
@ -41,9 +43,12 @@ public class TestAsyncFSWAL extends AbstractTestFSWAL {
private static EventLoopGroup GROUP;
private static Class<? extends Channel> CHANNEL_CLASS;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
GROUP = new NioEventLoopGroup(1, Threads.newDaemonThreadFactory("TestAsyncFSWAL"));
CHANNEL_CLASS = NioSocketChannel.class;
AbstractTestFSWAL.setUpBeforeClass();
}
@ -58,7 +63,7 @@ public class TestAsyncFSWAL extends AbstractTestFSWAL {
Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists,
String prefix, String suffix) throws IOException {
return new AsyncFSWAL(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix,
suffix, GROUP.next());
suffix, GROUP.next(), CHANNEL_CLASS);
}
@Override
@ -67,7 +72,7 @@ public class TestAsyncFSWAL extends AbstractTestFSWAL {
boolean failIfWALExists, String prefix, String suffix, final Runnable action)
throws IOException {
return new AsyncFSWAL(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix,
suffix, GROUP.next()) {
suffix, GROUP.next(), CHANNEL_CLASS) {
@Override
void atHeadOfRingBufferEventHandlerAppend() {

View File

@ -19,8 +19,10 @@ package org.apache.hadoop.hbase.regionserver.wal;
import com.google.common.base.Throwables;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.io.IOException;
import java.io.InterruptedIOException;
@ -42,9 +44,12 @@ public class TestAsyncProtobufLog extends AbstractTestProtobufLog<WALProvider.As
private static EventLoopGroup EVENT_LOOP_GROUP;
private static Class<? extends Channel> CHANNEL_CLASS;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
EVENT_LOOP_GROUP = new NioEventLoopGroup();
CHANNEL_CLASS = NioSocketChannel.class;
AbstractTestProtobufLog.setUpBeforeClass();
}
@ -57,7 +62,7 @@ public class TestAsyncProtobufLog extends AbstractTestProtobufLog<WALProvider.As
@Override
protected AsyncWriter createWriter(Path path) throws IOException {
return AsyncFSWALProvider.createAsyncWriter(TEST_UTIL.getConfiguration(), fs, path, false,
EVENT_LOOP_GROUP.next());
EVENT_LOOP_GROUP.next(), CHANNEL_CLASS);
}
@Override

View File

@ -17,8 +17,10 @@
*/
package org.apache.hadoop.hbase.regionserver.wal;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.io.IOException;
@ -40,9 +42,12 @@ public class TestAsyncWALReplay extends AbstractTestWALReplay {
private static EventLoopGroup GROUP;
private static Class<? extends Channel> CHANNEL_CLASS;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
GROUP = new NioEventLoopGroup(1, Threads.newDaemonThreadFactory("TestAsyncWALReplay"));
CHANNEL_CLASS = NioSocketChannel.class;
Configuration conf = AbstractTestWALReplay.TEST_UTIL.getConfiguration();
conf.set(WALFactory.WAL_PROVIDER, "asyncfs");
AbstractTestWALReplay.setUpBeforeClass();
@ -57,6 +62,6 @@ public class TestAsyncWALReplay extends AbstractTestWALReplay {
@Override
protected WAL createWAL(Configuration c, Path hbaseRootDir, String logName) throws IOException {
return new AsyncFSWAL(FileSystem.get(c), hbaseRootDir, logName,
HConstants.HREGION_OLDLOGDIR_NAME, c, null, true, null, null, GROUP.next());
HConstants.HREGION_OLDLOGDIR_NAME, c, null, true, null, null, GROUP.next(), CHANNEL_CLASS);
}
}