HBASE-26347 Support detect and exclude slow DNs in fan-out of WAL (#3800)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
Xiaolin Ha 2022-01-06 11:45:38 +08:00 committed by haxiaolin
parent 2b05e68ae1
commit 76bc412fc4
18 changed files with 592 additions and 72 deletions

View File

@ -23,6 +23,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
@ -47,11 +48,11 @@ public final class AsyncFSOutputHelper {
*/ */
public static AsyncFSOutput createOutput(FileSystem fs, Path f, boolean overwrite, public static AsyncFSOutput createOutput(FileSystem fs, Path f, boolean overwrite,
boolean createParent, short replication, long blockSize, EventLoopGroup eventLoopGroup, boolean createParent, short replication, long blockSize, EventLoopGroup eventLoopGroup,
Class<? extends Channel> channelClass) Class<? extends Channel> channelClass, StreamSlowMonitor monitor)
throws IOException, CommonFSUtils.StreamLacksCapabilityException { throws IOException, CommonFSUtils.StreamLacksCapabilityException {
if (fs instanceof DistributedFileSystem) { if (fs instanceof DistributedFileSystem) {
return FanOutOneBlockAsyncDFSOutputHelper.createOutput((DistributedFileSystem) fs, f, return FanOutOneBlockAsyncDFSOutputHelper.createOutput((DistributedFileSystem) fs, f,
overwrite, createParent, replication, blockSize, eventLoopGroup, channelClass); overwrite, createParent, replication, blockSize, eventLoopGroup, channelClass, monitor);
} }
final FSDataOutputStream out; final FSDataOutputStream out;
int bufferSize = fs.getConf().getInt(CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY, int bufferSize = fs.getConf().getInt(CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,

View File

@ -32,7 +32,7 @@ import java.nio.ByteBuffer;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@ -45,7 +45,9 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.Encryptor; import org.apache.hadoop.crypto.Encryptor;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.CancelOnClose; import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.CancelOnClose;
import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils; import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
@ -68,6 +70,7 @@ import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler.Sharable; import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler.Sharable;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelId; import org.apache.hbase.thirdparty.io.netty.channel.ChannelId;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelOutboundInvoker;
import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler; import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent; import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent;
@ -121,7 +124,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
private final Encryptor encryptor; private final Encryptor encryptor;
private final List<Channel> datanodeList; private final Map<Channel, DatanodeInfo> datanodeInfoMap;
private final DataChecksum summer; private final DataChecksum summer;
@ -137,17 +140,22 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
// should be backed by a thread safe collection // should be backed by a thread safe collection
private final Set<ChannelId> unfinishedReplicas; private final Set<ChannelId> unfinishedReplicas;
private final long packetDataLen;
private final long flushTimestamp;
private long lastAckTimestamp = -1;
public Callback(CompletableFuture<Long> future, long ackedLength, public Callback(CompletableFuture<Long> future, long ackedLength,
Collection<Channel> replicas) { final Collection<Channel> replicas, long packetDataLen) {
this.future = future; this.future = future;
this.ackedLength = ackedLength; this.ackedLength = ackedLength;
this.packetDataLen = packetDataLen;
this.flushTimestamp = EnvironmentEdgeManager.currentTime();
if (replicas.isEmpty()) { if (replicas.isEmpty()) {
this.unfinishedReplicas = Collections.emptySet(); this.unfinishedReplicas = Collections.emptySet();
} else { } else {
this.unfinishedReplicas = this.unfinishedReplicas =
Collections.newSetFromMap(new ConcurrentHashMap<ChannelId, Boolean>(replicas.size())); Collections.newSetFromMap(new ConcurrentHashMap<ChannelId, Boolean>(replicas.size()));
replicas.stream().map(c -> c.id()).forEachOrdered(unfinishedReplicas::add); replicas.stream().map(Channel::id).forEachOrdered(unfinishedReplicas::add);
} }
} }
} }
@ -177,6 +185,8 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
private volatile State state; private volatile State state;
private final StreamSlowMonitor streamSlowMonitor;
// all lock-free to make it run faster // all lock-free to make it run faster
private void completed(Channel channel) { private void completed(Channel channel) {
for (Iterator<Callback> iter = waitingAckQueue.iterator(); iter.hasNext();) { for (Iterator<Callback> iter = waitingAckQueue.iterator(); iter.hasNext();) {
@ -184,6 +194,10 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
// if the current unfinished replicas does not contain us then it means that we have already // if the current unfinished replicas does not contain us then it means that we have already
// acked this one, let's iterate to find the one we have not acked yet. // acked this one, let's iterate to find the one we have not acked yet.
if (c.unfinishedReplicas.remove(channel.id())) { if (c.unfinishedReplicas.remove(channel.id())) {
long current = EnvironmentEdgeManager.currentTime();
streamSlowMonitor.checkProcessTimeAndSpeed(datanodeInfoMap.get(channel), c.packetDataLen,
current - c.flushTimestamp, c.lastAckTimestamp, c.unfinishedReplicas.size());
c.lastAckTimestamp = current;
if (c.unfinishedReplicas.isEmpty()) { if (c.unfinishedReplicas.isEmpty()) {
// we need to remove first before complete the future. It is possible that after we // we need to remove first before complete the future. It is possible that after we
// complete the future the upper layer will call close immediately before we remove the // complete the future the upper layer will call close immediately before we remove the
@ -246,7 +260,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
} }
break; break;
} }
datanodeList.forEach(ch -> ch.close()); datanodeInfoMap.keySet().forEach(ChannelOutboundInvoker::close);
} }
@Sharable @Sharable
@ -314,7 +328,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
private void setupReceiver(int timeoutMs) { private void setupReceiver(int timeoutMs) {
AckHandler ackHandler = new AckHandler(timeoutMs); AckHandler ackHandler = new AckHandler(timeoutMs);
for (Channel ch : datanodeList) { for (Channel ch : datanodeInfoMap.keySet()) {
ch.pipeline().addLast( ch.pipeline().addLast(
new IdleStateHandler(timeoutMs, timeoutMs / 2, 0, TimeUnit.MILLISECONDS), new IdleStateHandler(timeoutMs, timeoutMs / 2, 0, TimeUnit.MILLISECONDS),
new ProtobufVarint32FrameDecoder(), new ProtobufVarint32FrameDecoder(),
@ -325,8 +339,8 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
FanOutOneBlockAsyncDFSOutput(Configuration conf,DistributedFileSystem dfs, FanOutOneBlockAsyncDFSOutput(Configuration conf,DistributedFileSystem dfs,
DFSClient client, ClientProtocol namenode, String clientName, String src, long fileId, DFSClient client, ClientProtocol namenode, String clientName, String src, long fileId,
LocatedBlock locatedBlock, Encryptor encryptor, List<Channel> datanodeList, LocatedBlock locatedBlock, Encryptor encryptor, Map<Channel, DatanodeInfo> datanodeInfoMap,
DataChecksum summer, ByteBufAllocator alloc) { DataChecksum summer, ByteBufAllocator alloc, StreamSlowMonitor streamSlowMonitor) {
this.conf = conf; this.conf = conf;
this.dfs = dfs; this.dfs = dfs;
this.client = client; this.client = client;
@ -337,13 +351,14 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
this.block = locatedBlock.getBlock(); this.block = locatedBlock.getBlock();
this.locations = locatedBlock.getLocations(); this.locations = locatedBlock.getLocations();
this.encryptor = encryptor; this.encryptor = encryptor;
this.datanodeList = datanodeList; this.datanodeInfoMap = datanodeInfoMap;
this.summer = summer; this.summer = summer;
this.maxDataLen = MAX_DATA_LEN - (MAX_DATA_LEN % summer.getBytesPerChecksum()); this.maxDataLen = MAX_DATA_LEN - (MAX_DATA_LEN % summer.getBytesPerChecksum());
this.alloc = alloc; this.alloc = alloc;
this.buf = alloc.directBuffer(sendBufSizePRedictor.initialSize()); this.buf = alloc.directBuffer(sendBufSizePRedictor.initialSize());
this.state = State.STREAMING; this.state = State.STREAMING;
setupReceiver(conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT)); setupReceiver(conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT));
this.streamSlowMonitor = streamSlowMonitor;
} }
@Override @Override
@ -395,7 +410,8 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
ByteBuf headerBuf = alloc.buffer(headerLen); ByteBuf headerBuf = alloc.buffer(headerLen);
header.putInBuffer(headerBuf.nioBuffer(0, headerLen)); header.putInBuffer(headerBuf.nioBuffer(0, headerLen));
headerBuf.writerIndex(headerLen); headerBuf.writerIndex(headerLen);
Callback c = new Callback(future, nextPacketOffsetInBlock + dataLen, datanodeList); Callback c = new Callback(future, nextPacketOffsetInBlock + dataLen,
datanodeInfoMap.keySet(), dataLen);
waitingAckQueue.addLast(c); waitingAckQueue.addLast(c);
// recheck again after we pushed the callback to queue // recheck again after we pushed the callback to queue
if (state != State.STREAMING && waitingAckQueue.peekFirst() == c) { if (state != State.STREAMING && waitingAckQueue.peekFirst() == c) {
@ -404,7 +420,9 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
waitingAckQueue.removeFirst(); waitingAckQueue.removeFirst();
return; return;
} }
datanodeList.forEach(ch -> { // TODO: we should perhaps measure time taken per DN here;
// we could collect statistics per DN, and/or exclude bad nodes in createOutput.
datanodeInfoMap.keySet().forEach(ch -> {
ch.write(headerBuf.retainedDuplicate()); ch.write(headerBuf.retainedDuplicate());
ch.write(checksumBuf.retainedDuplicate()); ch.write(checksumBuf.retainedDuplicate());
ch.writeAndFlush(dataBuf.retainedDuplicate()); ch.writeAndFlush(dataBuf.retainedDuplicate());
@ -426,7 +444,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
long lengthAfterFlush = nextPacketOffsetInBlock + dataLen; long lengthAfterFlush = nextPacketOffsetInBlock + dataLen;
Callback lastFlush = waitingAckQueue.peekLast(); Callback lastFlush = waitingAckQueue.peekLast();
if (lastFlush != null) { if (lastFlush != null) {
Callback c = new Callback(future, lengthAfterFlush, Collections.emptyList()); Callback c = new Callback(future, lengthAfterFlush, Collections.emptySet(), dataLen);
waitingAckQueue.addLast(c); waitingAckQueue.addLast(c);
// recheck here if we have already removed the previous callback from the queue // recheck here if we have already removed the previous callback from the queue
if (waitingAckQueue.peekFirst() == c) { if (waitingAckQueue.peekFirst() == c) {
@ -526,8 +544,8 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
header.putInBuffer(headerBuf.nioBuffer(0, headerLen)); header.putInBuffer(headerBuf.nioBuffer(0, headerLen));
headerBuf.writerIndex(headerLen); headerBuf.writerIndex(headerLen);
CompletableFuture<Long> future = new CompletableFuture<>(); CompletableFuture<Long> future = new CompletableFuture<>();
waitingAckQueue.add(new Callback(future, finalizedLength, datanodeList)); waitingAckQueue.add(new Callback(future, finalizedLength, datanodeInfoMap.keySet(), 0));
datanodeList.forEach(ch -> ch.writeAndFlush(headerBuf.retainedDuplicate())); datanodeInfoMap.keySet().forEach(ch -> ch.writeAndFlush(headerBuf.retainedDuplicate()));
headerBuf.release(); headerBuf.release();
try { try {
future.get(); future.get();
@ -544,13 +562,14 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
* The close method when error occurred. Now we just call recoverFileLease. * The close method when error occurred. Now we just call recoverFileLease.
*/ */
@Override @Override
@SuppressWarnings("FutureReturnValueIgnored")
public void recoverAndClose(CancelableProgressable reporter) throws IOException { public void recoverAndClose(CancelableProgressable reporter) throws IOException {
if (buf != null) { if (buf != null) {
buf.release(); buf.release();
buf = null; buf = null;
} }
datanodeList.forEach(ch -> ch.close()); datanodeInfoMap.keySet().forEach(ChannelOutboundInvoker::close);
datanodeList.forEach(ch -> ch.closeFuture().awaitUninterruptibly()); datanodeInfoMap.keySet().forEach(ch -> ch.closeFuture().awaitUninterruptibly());
endFileLease(client, fileId); endFileLease(client, fileId);
RecoverLeaseFSUtils.recoverFileLease(dfs, new Path(src), conf, RecoverLeaseFSUtils.recoverFileLease(dfs, new Path(src), conf,
reporter == null ? new CancelOnClose(client) : reporter); reporter == null ? new CancelOnClose(client) : reporter);
@ -561,11 +580,12 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
* {@link #recoverAndClose(CancelableProgressable)} if this method throws an exception. * {@link #recoverAndClose(CancelableProgressable)} if this method throws an exception.
*/ */
@Override @Override
@SuppressWarnings("FutureReturnValueIgnored")
public void close() throws IOException { public void close() throws IOException {
endBlock(); endBlock();
state = State.CLOSED; state = State.CLOSED;
datanodeList.forEach(ch -> ch.close()); datanodeInfoMap.keySet().forEach(ChannelOutboundInvoker::close);
datanodeList.forEach(ch -> ch.closeFuture().awaitUninterruptibly()); datanodeInfoMap.keySet().forEach(ch -> ch.closeFuture().awaitUninterruptibly());
block.setNumBytes(ackedBlockLength); block.setNumBytes(ackedBlockLength);
completeFile(client, namenode, src, clientName, block, fileId); completeFile(client, namenode, src, clientName, block, fileId);
} }

View File

@ -33,9 +33,12 @@ import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.CryptoProtocolVersion; import org.apache.hadoop.crypto.CryptoProtocolVersion;
import org.apache.hadoop.crypto.Encryptor; import org.apache.hadoop.crypto.Encryptor;
@ -47,6 +50,8 @@ import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hbase.client.ConnectionUtils; import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager;
import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSOutputStream; import org.apache.hadoop.hdfs.DFSOutputStream;
@ -128,8 +133,6 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
// Timeouts for communicating with DataNode for streaming writes/reads // Timeouts for communicating with DataNode for streaming writes/reads
public static final int READ_TIMEOUT = 60 * 1000; public static final int READ_TIMEOUT = 60 * 1000;
private static final DatanodeInfo[] EMPTY_DN_ARRAY = new DatanodeInfo[0];
private interface LeaseManager { private interface LeaseManager {
void begin(DFSClient client, long inodeId); void begin(DFSClient client, long inodeId);
@ -511,15 +514,20 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, String src, private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, String src,
boolean overwrite, boolean createParent, short replication, long blockSize, boolean overwrite, boolean createParent, short replication, long blockSize,
EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) throws IOException { EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass,
StreamSlowMonitor monitor) throws IOException {
Configuration conf = dfs.getConf(); Configuration conf = dfs.getConf();
DFSClient client = dfs.getClient(); DFSClient client = dfs.getClient();
String clientName = client.getClientName(); String clientName = client.getClientName();
ClientProtocol namenode = client.getNamenode(); ClientProtocol namenode = client.getNamenode();
int createMaxRetries = conf.getInt(ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES, int createMaxRetries = conf.getInt(ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES,
DEFAULT_ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES); DEFAULT_ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES);
DatanodeInfo[] excludesNodes = EMPTY_DN_ARRAY; ExcludeDatanodeManager excludeDatanodeManager = monitor.getExcludeDatanodeManager();
Set<DatanodeInfo> toExcludeNodes =
new HashSet<>(excludeDatanodeManager.getExcludeDNs().keySet());
for (int retry = 0;; retry++) { for (int retry = 0;; retry++) {
LOG.debug("When create output stream for {}, exclude list is {}, retry={}", src,
toExcludeNodes, retry);
HdfsFileStatus stat; HdfsFileStatus stat;
try { try {
stat = FILE_CREATOR.create(namenode, src, stat = FILE_CREATOR.create(namenode, src,
@ -539,24 +547,26 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
List<Future<Channel>> futureList = null; List<Future<Channel>> futureList = null;
try { try {
DataChecksum summer = createChecksum(client); DataChecksum summer = createChecksum(client);
locatedBlock = namenode.addBlock(src, client.getClientName(), null, excludesNodes, locatedBlock = namenode.addBlock(src, client.getClientName(), null,
stat.getFileId(), null, null); toExcludeNodes.toArray(new DatanodeInfo[0]), stat.getFileId(), null, null);
List<Channel> datanodeList = new ArrayList<>(); Map<Channel, DatanodeInfo> datanodes = new IdentityHashMap<>();
futureList = connectToDataNodes(conf, client, clientName, locatedBlock, 0L, 0L, futureList = connectToDataNodes(conf, client, clientName, locatedBlock, 0L, 0L,
PIPELINE_SETUP_CREATE, summer, eventLoopGroup, channelClass); PIPELINE_SETUP_CREATE, summer, eventLoopGroup, channelClass);
for (int i = 0, n = futureList.size(); i < n; i++) { for (int i = 0, n = futureList.size(); i < n; i++) {
DatanodeInfo datanodeInfo = locatedBlock.getLocations()[i];
try { try {
datanodeList.add(futureList.get(i).syncUninterruptibly().getNow()); datanodes.put(futureList.get(i).syncUninterruptibly().getNow(), datanodeInfo);
} catch (Exception e) { } catch (Exception e) {
// exclude the broken DN next time // exclude the broken DN next time
excludesNodes = ArrayUtils.add(excludesNodes, locatedBlock.getLocations()[i]); toExcludeNodes.add(datanodeInfo);
excludeDatanodeManager.tryAddExcludeDN(datanodeInfo, "connect error");
throw e; throw e;
} }
} }
Encryptor encryptor = createEncryptor(conf, stat, client); Encryptor encryptor = createEncryptor(conf, stat, client);
FanOutOneBlockAsyncDFSOutput output = FanOutOneBlockAsyncDFSOutput output =
new FanOutOneBlockAsyncDFSOutput(conf, dfs, client, namenode, clientName, src, new FanOutOneBlockAsyncDFSOutput(conf, dfs, client, namenode, clientName, src,
stat.getFileId(), locatedBlock, encryptor, datanodeList, summer, ALLOC); stat.getFileId(), locatedBlock, encryptor, datanodes, summer, ALLOC, monitor);
succ = true; succ = true;
return output; return output;
} catch (RemoteException e) { } catch (RemoteException e) {
@ -607,14 +617,15 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
*/ */
public static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, Path f, public static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, Path f,
boolean overwrite, boolean createParent, short replication, long blockSize, boolean overwrite, boolean createParent, short replication, long blockSize,
EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) throws IOException { EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass,
final StreamSlowMonitor monitor) throws IOException {
return new FileSystemLinkResolver<FanOutOneBlockAsyncDFSOutput>() { return new FileSystemLinkResolver<FanOutOneBlockAsyncDFSOutput>() {
@Override @Override
public FanOutOneBlockAsyncDFSOutput doCall(Path p) public FanOutOneBlockAsyncDFSOutput doCall(Path p)
throws IOException, UnresolvedLinkException { throws IOException, UnresolvedLinkException {
return createOutput(dfs, p.toUri().getPath(), overwrite, createParent, replication, return createOutput(dfs, p.toUri().getPath(), overwrite, createParent, replication,
blockSize, eventLoopGroup, channelClass); blockSize, eventLoopGroup, channelClass, monitor);
} }
@Override @Override

View File

@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.asyncfs.monitor;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
/**
* The class to manage the excluded datanodes of the WALs on the regionserver.
*/
@InterfaceAudience.Private
public class ExcludeDatanodeManager implements ConfigurationObserver {
private static final Logger LOG = LoggerFactory.getLogger(ExcludeDatanodeManager.class);
/**
* Configure for the max count the excluded datanodes.
*/
public static final String WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT_KEY =
"hbase.regionserver.async.wal.max.exclude.datanode.count";
public static final int DEFAULT_WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT = 3;
/**
* Configure for the TTL time of the datanodes excluded
*/
public static final String WAL_EXCLUDE_DATANODE_TTL_KEY =
"hbase.regionserver.async.wal.exclude.datanode.info.ttl.hour";
public static final int DEFAULT_WAL_EXCLUDE_DATANODE_TTL = 6; // 6 hours
private volatile Cache<DatanodeInfo, Long> excludeDNsCache;
private final int maxExcludeDNCount;
private final Configuration conf;
// This is a map of providerId->StreamSlowMonitor
private final Map<String, StreamSlowMonitor> streamSlowMonitors =
new ConcurrentHashMap<>(1);
public ExcludeDatanodeManager(Configuration conf) {
this.conf = conf;
this.maxExcludeDNCount = conf.getInt(WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT_KEY,
DEFAULT_WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT);
this.excludeDNsCache = CacheBuilder.newBuilder()
.expireAfterWrite(this.conf.getLong(WAL_EXCLUDE_DATANODE_TTL_KEY,
DEFAULT_WAL_EXCLUDE_DATANODE_TTL), TimeUnit.HOURS)
.maximumSize(this.maxExcludeDNCount)
.build();
}
/**
* Try to add a datanode to the regionserver excluding cache
* @param datanodeInfo the datanode to be added to the excluded cache
* @param cause the cause that the datanode is hope to be excluded
* @return True if the datanode is added to the regionserver excluding cache, false otherwise
*/
public boolean tryAddExcludeDN(DatanodeInfo datanodeInfo, String cause) {
boolean alreadyMarkedSlow = getExcludeDNs().containsKey(datanodeInfo);
if (!alreadyMarkedSlow) {
excludeDNsCache.put(datanodeInfo, EnvironmentEdgeManager.currentTime());
LOG.info(
"Added datanode: {} to exclude cache by [{}] success, current excludeDNsCache size={}",
datanodeInfo, cause, excludeDNsCache.size());
return true;
}
LOG.debug("Try add datanode {} to exclude cache by [{}] failed, "
+ "current exclude DNs are {}", datanodeInfo, cause, getExcludeDNs().keySet());
return false;
}
public StreamSlowMonitor getStreamSlowMonitor(String name) {
String key = name == null || name.isEmpty() ? "defaultMonitorName" : name;
return streamSlowMonitors
.computeIfAbsent(key, k -> new StreamSlowMonitor(conf, key, this));
}
public Map<DatanodeInfo, Long> getExcludeDNs() {
return excludeDNsCache.asMap();
}
@Override
public void onConfigurationChange(Configuration conf) {
for (StreamSlowMonitor monitor : streamSlowMonitors.values()) {
monitor.onConfigurationChange(conf);
}
this.excludeDNsCache = CacheBuilder.newBuilder().expireAfterWrite(
this.conf.getLong(WAL_EXCLUDE_DATANODE_TTL_KEY, DEFAULT_WAL_EXCLUDE_DATANODE_TTL),
TimeUnit.HOURS).maximumSize(this.conf
.getInt(WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT_KEY, DEFAULT_WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT))
.build();
}
}

View File

@ -0,0 +1,217 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.asyncfs.monitor;
import static org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager.DEFAULT_WAL_EXCLUDE_DATANODE_TTL;
import static org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager.DEFAULT_WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT;
import static org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager.WAL_EXCLUDE_DATANODE_TTL_KEY;
import static org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager.WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT_KEY;
import java.util.Deque;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
/**
* Class for monitor the wal file flush performance.
* Each active wal file has a StreamSlowMonitor.
*/
@InterfaceAudience.Private
public class StreamSlowMonitor implements ConfigurationObserver {
private static final Logger LOG = LoggerFactory.getLogger(StreamSlowMonitor.class);
/**
* Configure for the min count for a datanode detected slow.
* If a datanode is detected slow times up to this count, then it will be added to the exclude
* datanode cache by {@link ExcludeDatanodeManager#tryAddExcludeDN(DatanodeInfo, String)}
* of this regionsever.
*/
private static final String WAL_SLOW_DETECT_MIN_COUNT_KEY =
"hbase.regionserver.async.wal.min.slow.detect.count";
private static final int DEFAULT_WAL_SLOW_DETECT_MIN_COUNT = 3;
/**
* Configure for the TTL of the data that a datanode detected slow.
*/
private static final String WAL_SLOW_DETECT_DATA_TTL_KEY =
"hbase.regionserver.async.wal.slow.detect.data.ttl.ms";
private static final long DEFAULT_WAL_SLOW_DETECT_DATA_TTL = 10 * 60 * 1000; // 10min in ms
/**
* Configure for the speed check of packet min length.
* For packets whose data length smaller than this value, check slow by processing time.
* While for packets whose data length larger than this value, check slow by flushing speed.
*/
private static final String DATANODE_PACKET_FLUSH_CHECK_SPEED_MIN_DATA_LENGTH_KEY =
"hbase.regionserver.async.wal.datanode.slow.check.speed.packet.data.length.min";
private static final long DEFAULT_DATANODE_PACKET_FLUSH_CHECK_SPEED_MIN_DATA_LENGTH =
64 * 1024; //64KB
/**
* Configure for the slow packet process time, a duration from send to ACK.
* The processing time check is for packets that data length smaller than
* {@link StreamSlowMonitor#DATANODE_PACKET_FLUSH_CHECK_SPEED_MIN_DATA_LENGTH_KEY}
*/
public static final String DATANODE_SLOW_PACKET_PROCESS_TIME_KEY =
"hbase.regionserver.async.wal.datanode.slow.packet.process.time.millis";
private static final long DEFAULT_DATANODE_SLOW_PACKET_PROCESS_TIME = 6000; // 6s in ms
/**
* Configure for the check of large packet(which is configured by
* {@link StreamSlowMonitor#DATANODE_PACKET_FLUSH_CHECK_SPEED_MIN_DATA_LENGTH_KEY}) flush speed.
* e.g. If the configured slow packet process time is smaller than 10s, then here 20KB/s means
* 64KB should be processed in less than 3.2s.
*/
private static final String DATANODE_SLOW_PACKET_FLUSH_MIN_SPEED_KEY =
"hbase.regionserver.async.wal.datanode.slow.packet.speed.min.kbs";
private static final double DEFAULT_DATANODE_SLOW_PACKET_FLUSH_MIN_SPEED = 20; // 20KB/s
private final String name;
// this is a map of datanodeInfo->queued slow PacketAckData
private final LoadingCache<DatanodeInfo, Deque<PacketAckData>> datanodeSlowDataQueue;
private final ExcludeDatanodeManager excludeDatanodeManager;
private int minSlowDetectCount;
private long slowDataTtl;
private long slowPacketAckMs;
private double minPacketFlushSpeedKBs;
private long minLengthForSpeedCheck;
public StreamSlowMonitor(Configuration conf, String name,
ExcludeDatanodeManager excludeDatanodeManager) {
setConf(conf);
this.name = name;
this.excludeDatanodeManager = excludeDatanodeManager;
this.datanodeSlowDataQueue = CacheBuilder.newBuilder()
.maximumSize(conf.getInt(WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT_KEY,
DEFAULT_WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT))
.expireAfterWrite(conf.getLong(WAL_EXCLUDE_DATANODE_TTL_KEY,
DEFAULT_WAL_EXCLUDE_DATANODE_TTL), TimeUnit.HOURS)
.build(new CacheLoader<DatanodeInfo, Deque<PacketAckData>>() {
@Override
public Deque<PacketAckData> load(DatanodeInfo key) throws Exception {
return new ConcurrentLinkedDeque<>();
}
});
LOG.info("New stream slow monitor {}", this.name);
}
public static StreamSlowMonitor create(Configuration conf, String name) {
return new StreamSlowMonitor(conf, name, new ExcludeDatanodeManager(conf));
}
/**
* Check if the packet process time shows that the relevant datanode is a slow node.
* @param datanodeInfo the datanode that processed the packet
* @param packetDataLen the data length of the packet (in bytes)
* @param processTimeMs the process time (in ms) of the packet on the datanode,
* @param lastAckTimestamp the last acked timestamp of the packet on another datanode
* @param unfinished if the packet is unfinished flushed to the datanode replicas
*/
public void checkProcessTimeAndSpeed(DatanodeInfo datanodeInfo, long packetDataLen,
long processTimeMs, long lastAckTimestamp, int unfinished) {
long current = EnvironmentEdgeManager.currentTime();
// Here are two conditions used to determine whether a datanode is slow,
// 1. For small packet, we just have a simple time limit, without considering
// the size of the packet.
// 2. For large packet, we will calculate the speed, and check if the speed is too slow.
boolean slow = (packetDataLen <= minLengthForSpeedCheck && processTimeMs > slowPacketAckMs) || (
packetDataLen > minLengthForSpeedCheck
&& (double) packetDataLen / processTimeMs < minPacketFlushSpeedKBs);
if (slow) {
// Check if large diff ack timestamp between replicas,
// should try to avoid misjudgments that caused by GC STW.
if ((lastAckTimestamp > 0 && current - lastAckTimestamp > slowPacketAckMs / 2) || (
lastAckTimestamp <= 0 && unfinished == 0)) {
LOG.info("Slow datanode: {}, data length={}, duration={}ms, unfinishedReplicas={}, "
+ "lastAckTimestamp={}, monitor name: {}", datanodeInfo, packetDataLen, processTimeMs,
unfinished, lastAckTimestamp, this.name);
if (addSlowAckData(datanodeInfo, packetDataLen, processTimeMs)) {
excludeDatanodeManager.tryAddExcludeDN(datanodeInfo, "slow packet ack");
}
}
}
}
@Override
public void onConfigurationChange(Configuration conf) {
setConf(conf);
}
private boolean addSlowAckData(DatanodeInfo datanodeInfo, long dataLength, long processTime) {
Deque<PacketAckData> slowDNQueue = datanodeSlowDataQueue.getUnchecked(datanodeInfo);
long current = EnvironmentEdgeManager.currentTime();
while (!slowDNQueue.isEmpty() && (current - slowDNQueue.getFirst().getTimestamp() > slowDataTtl
|| slowDNQueue.size() >= minSlowDetectCount)) {
slowDNQueue.removeFirst();
}
slowDNQueue.addLast(new PacketAckData(dataLength, processTime));
return slowDNQueue.size() >= minSlowDetectCount;
}
private void setConf(Configuration conf) {
this.minSlowDetectCount = conf.getInt(WAL_SLOW_DETECT_MIN_COUNT_KEY,
DEFAULT_WAL_SLOW_DETECT_MIN_COUNT);
this.slowDataTtl = conf.getLong(WAL_SLOW_DETECT_DATA_TTL_KEY, DEFAULT_WAL_SLOW_DETECT_DATA_TTL);
this.slowPacketAckMs = conf.getLong(DATANODE_SLOW_PACKET_PROCESS_TIME_KEY,
DEFAULT_DATANODE_SLOW_PACKET_PROCESS_TIME);
this.minLengthForSpeedCheck = conf.getLong(
DATANODE_PACKET_FLUSH_CHECK_SPEED_MIN_DATA_LENGTH_KEY,
DEFAULT_DATANODE_PACKET_FLUSH_CHECK_SPEED_MIN_DATA_LENGTH);
this.minPacketFlushSpeedKBs = conf.getDouble(DATANODE_SLOW_PACKET_FLUSH_MIN_SPEED_KEY,
DEFAULT_DATANODE_SLOW_PACKET_FLUSH_MIN_SPEED);
}
public ExcludeDatanodeManager getExcludeDatanodeManager() {
return excludeDatanodeManager;
}
private static class PacketAckData {
private final long dataLength;
private final long processTime;
private final long timestamp;
public PacketAckData(long dataLength, long processTime) {
this.dataLength = dataLength;
this.processTime = processTime;
this.timestamp = EnvironmentEdgeManager.currentTime();
}
public long getDataLength() {
return dataLength;
}
public long getProcessTime() {
return processTime;
}
public long getTimestamp() {
return timestamp;
}
}
}

View File

@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.asyncfs;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager;
import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ SmallTests.class })
public class TestExcludeDatanodeManager {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestExcludeDatanodeManager.class);
@Test
public void testExcludeSlowDNBySpeed() {
Configuration conf = HBaseConfiguration.create();
ExcludeDatanodeManager excludeDatanodeManager = new ExcludeDatanodeManager(conf);
StreamSlowMonitor streamSlowDNsMonitor =
excludeDatanodeManager.getStreamSlowMonitor("testMonitor");
assertEquals(0, excludeDatanodeManager.getExcludeDNs().size());
DatanodeInfo datanodeInfo =
new DatanodeInfo.DatanodeInfoBuilder().setIpAddr("0.0.0.0").setHostName("hostname1")
.setDatanodeUuid("uuid1").setXferPort(111).setInfoPort(222).setInfoSecurePort(333)
.setIpcPort(444).setNetworkLocation("location1").build();
streamSlowDNsMonitor
.checkProcessTimeAndSpeed(datanodeInfo, 100000, 5100,
System.currentTimeMillis() - 5100, 0);
streamSlowDNsMonitor
.checkProcessTimeAndSpeed(datanodeInfo, 100000, 5100,
System.currentTimeMillis() - 5100, 0);
streamSlowDNsMonitor
.checkProcessTimeAndSpeed(datanodeInfo, 100000, 5100,
System.currentTimeMillis() - 5100, 0);
assertEquals(1, excludeDatanodeManager.getExcludeDNs().size());
assertTrue(excludeDatanodeManager.getExcludeDNs().containsKey(datanodeInfo));
}
@Test
public void testExcludeSlowDNByProcessTime() {
Configuration conf = HBaseConfiguration.create();
ExcludeDatanodeManager excludeDatanodeManager = new ExcludeDatanodeManager(conf);
StreamSlowMonitor streamSlowDNsMonitor =
excludeDatanodeManager.getStreamSlowMonitor("testMonitor");
assertEquals(0, excludeDatanodeManager.getExcludeDNs().size());
DatanodeInfo datanodeInfo =
new DatanodeInfo.DatanodeInfoBuilder().setIpAddr("0.0.0.0").setHostName("hostname1")
.setDatanodeUuid("uuid1").setXferPort(111).setInfoPort(222).setInfoSecurePort(333)
.setIpcPort(444).setNetworkLocation("location1").build();
streamSlowDNsMonitor
.checkProcessTimeAndSpeed(datanodeInfo, 5000, 7000,
System.currentTimeMillis() - 7000, 0);
streamSlowDNsMonitor
.checkProcessTimeAndSpeed(datanodeInfo, 5000, 7000,
System.currentTimeMillis() - 7000, 0);
streamSlowDNsMonitor
.checkProcessTimeAndSpeed(datanodeInfo, 5000, 7000,
System.currentTimeMillis() - 7000, 0);
assertEquals(1, excludeDatanodeManager.getExcludeDNs().size());
assertTrue(excludeDatanodeManager.getExcludeDNs().containsKey(datanodeInfo));
}
}

View File

@ -39,6 +39,9 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager;
import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
@ -78,6 +81,8 @@ public class TestFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase {
private static int READ_TIMEOUT_MS = 2000; private static int READ_TIMEOUT_MS = 2000;
private static StreamSlowMonitor MONITOR;
@Rule @Rule
public TestName name = new TestName(); public TestName name = new TestName();
@ -88,6 +93,7 @@ public class TestFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase {
FS = CLUSTER.getFileSystem(); FS = CLUSTER.getFileSystem();
EVENT_LOOP_GROUP = new NioEventLoopGroup(); EVENT_LOOP_GROUP = new NioEventLoopGroup();
CHANNEL_CLASS = NioSocketChannel.class; CHANNEL_CLASS = NioSocketChannel.class;
MONITOR = StreamSlowMonitor.create(UTIL.getConfiguration(), "testMonitor");
} }
@AfterClass @AfterClass
@ -133,7 +139,7 @@ public class TestFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase {
Path f = new Path("/" + name.getMethodName()); Path f = new Path("/" + name.getMethodName());
EventLoop eventLoop = EVENT_LOOP_GROUP.next(); EventLoop eventLoop = EVENT_LOOP_GROUP.next();
FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS); false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR);
writeAndVerify(FS, f, out); writeAndVerify(FS, f, out);
} }
@ -142,7 +148,7 @@ public class TestFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase {
Path f = new Path("/" + name.getMethodName()); Path f = new Path("/" + name.getMethodName());
EventLoop eventLoop = EVENT_LOOP_GROUP.next(); EventLoop eventLoop = EVENT_LOOP_GROUP.next();
FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS); false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR);
byte[] b = new byte[10]; byte[] b = new byte[10];
ThreadLocalRandom.current().nextBytes(b); ThreadLocalRandom.current().nextBytes(b);
out.write(b, 0, b.length); out.write(b, 0, b.length);
@ -171,7 +177,7 @@ public class TestFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase {
Path f = new Path("/" + name.getMethodName()); Path f = new Path("/" + name.getMethodName());
EventLoop eventLoop = EVENT_LOOP_GROUP.next(); EventLoop eventLoop = EVENT_LOOP_GROUP.next();
FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS); false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR);
Thread.sleep(READ_TIMEOUT_MS * 2); Thread.sleep(READ_TIMEOUT_MS * 2);
// the connection to datanode should still alive. // the connection to datanode should still alive.
writeAndVerify(FS, f, out); writeAndVerify(FS, f, out);
@ -186,7 +192,7 @@ public class TestFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase {
EventLoop eventLoop = EVENT_LOOP_GROUP.next(); EventLoop eventLoop = EVENT_LOOP_GROUP.next();
try { try {
FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3, FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3,
FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS); FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR);
fail("should fail with parent does not exist"); fail("should fail with parent does not exist");
} catch (RemoteException e) { } catch (RemoteException e) {
LOG.info("expected exception caught", e); LOG.info("expected exception caught", e);
@ -209,7 +215,7 @@ public class TestFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase {
Path f = new Path("/test"); Path f = new Path("/test");
EventLoop eventLoop = EVENT_LOOP_GROUP.next(); EventLoop eventLoop = EVENT_LOOP_GROUP.next();
try (FanOutOneBlockAsyncDFSOutput output = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, try (FanOutOneBlockAsyncDFSOutput output = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS,
f, true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS)) { f, true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR)) {
// should exclude the dead dn when retry so here we only have 2 DNs in pipeline // should exclude the dead dn when retry so here we only have 2 DNs in pipeline
assertEquals(2, output.getPipeline().length); assertEquals(2, output.getPipeline().length);
} finally { } finally {
@ -217,12 +223,42 @@ public class TestFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase {
} }
} }
@Test
public void testExcludeFailedConnectToDatanode()
throws IOException, ClassNotFoundException, NoSuchMethodException, IllegalAccessException,
InvocationTargetException, InterruptedException, NoSuchFieldException {
Field xceiverServerDaemonField = DataNode.class.getDeclaredField("dataXceiverServer");
xceiverServerDaemonField.setAccessible(true);
Class<?> xceiverServerClass =
Class.forName("org.apache.hadoop.hdfs.server.datanode.DataXceiverServer");
Method numPeersMethod = xceiverServerClass.getDeclaredMethod("getNumPeers");
numPeersMethod.setAccessible(true);
// make one datanode broken
DataNodeProperties dnProp = CLUSTER.stopDataNode(0);
Path f = new Path("/test");
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
ExcludeDatanodeManager excludeDatanodeManager =
new ExcludeDatanodeManager(HBaseConfiguration.create());
StreamSlowMonitor streamSlowDNsMonitor =
excludeDatanodeManager.getStreamSlowMonitor("testMonitor");
assertEquals(0, excludeDatanodeManager.getExcludeDNs().size());
try (FanOutOneBlockAsyncDFSOutput output = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS,
f, true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop,
CHANNEL_CLASS, streamSlowDNsMonitor)) {
// should exclude the dead dn when retry so here we only have 2 DNs in pipeline
assertEquals(2, output.getPipeline().length);
assertEquals(1, excludeDatanodeManager.getExcludeDNs().size());
} finally {
CLUSTER.restartDataNode(dnProp);
}
}
@Test @Test
public void testWriteLargeChunk() throws IOException, InterruptedException, ExecutionException { public void testWriteLargeChunk() throws IOException, InterruptedException, ExecutionException {
Path f = new Path("/" + name.getMethodName()); Path f = new Path("/" + name.getMethodName());
EventLoop eventLoop = EVENT_LOOP_GROUP.next(); EventLoop eventLoop = EVENT_LOOP_GROUP.next();
FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
false, (short) 3, 1024 * 1024 * 1024, eventLoop, CHANNEL_CLASS); false, (short) 3, 1024 * 1024 * 1024, eventLoop, CHANNEL_CLASS, MONITOR);
byte[] b = new byte[50 * 1024 * 1024]; byte[] b = new byte[50 * 1024 * 1024];
ThreadLocalRandom.current().nextBytes(b); ThreadLocalRandom.current().nextBytes(b);
out.write(b); out.write(b);

View File

@ -23,6 +23,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseCommonTestingUtility; import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.CommonFSUtils;
@ -49,10 +50,13 @@ public class TestLocalAsyncOutput {
private static final HBaseCommonTestingUtility TEST_UTIL = new HBaseCommonTestingUtility(); private static final HBaseCommonTestingUtility TEST_UTIL = new HBaseCommonTestingUtility();
private static StreamSlowMonitor MONITOR;
@AfterClass @AfterClass
public static void tearDownAfterClass() throws IOException { public static void tearDownAfterClass() throws IOException {
TEST_UTIL.cleanupTestDir(); TEST_UTIL.cleanupTestDir();
GROUP.shutdownGracefully(); GROUP.shutdownGracefully();
MONITOR = StreamSlowMonitor.create(TEST_UTIL.getConfiguration(), "testMonitor");
} }
@Test @Test
@ -61,7 +65,7 @@ public class TestLocalAsyncOutput {
Path f = new Path(TEST_UTIL.getDataTestDir(), "test"); Path f = new Path(TEST_UTIL.getDataTestDir(), "test");
FileSystem fs = FileSystem.getLocal(TEST_UTIL.getConfiguration()); FileSystem fs = FileSystem.getLocal(TEST_UTIL.getConfiguration());
AsyncFSOutput out = AsyncFSOutputHelper.createOutput(fs, f, false, true, AsyncFSOutput out = AsyncFSOutputHelper.createOutput(fs, f, false, true,
fs.getDefaultReplication(f), fs.getDefaultBlockSize(f), GROUP, CHANNEL_CLASS); fs.getDefaultReplication(f), fs.getDefaultBlockSize(f), GROUP, CHANNEL_CLASS, MONITOR);
TestFanOutOneBlockAsyncDFSOutput.writeAndVerify(fs, f, out); TestFanOutOneBlockAsyncDFSOutput.writeAndVerify(fs, f, out);
} }
} }

View File

@ -21,7 +21,6 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.Method; import java.lang.reflect.Method;
@ -40,6 +39,7 @@ import org.apache.hadoop.crypto.key.KeyProvider;
import org.apache.hadoop.crypto.key.KeyProviderFactory; import org.apache.hadoop.crypto.key.KeyProviderFactory;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
import org.apache.hadoop.hbase.security.HBaseKerberosUtils; import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
import org.apache.hadoop.hbase.security.SecurityConstants; import org.apache.hadoop.hbase.security.SecurityConstants;
import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.LargeTests;
@ -62,7 +62,6 @@ import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters; import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.io.netty.channel.Channel; import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoop; import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
@ -102,6 +101,8 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase {
private static String TEST_KEY_NAME = "test_key"; private static String TEST_KEY_NAME = "test_key";
private static StreamSlowMonitor MONITOR;
@Rule @Rule
public TestName name = new TestName(); public TestName name = new TestName();
@ -187,6 +188,7 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase {
HBaseKerberosUtils.setSecuredConfiguration(UTIL.getConfiguration(), HBaseKerberosUtils.setSecuredConfiguration(UTIL.getConfiguration(),
PRINCIPAL + "@" + KDC.getRealm(), HTTP_PRINCIPAL + "@" + KDC.getRealm()); PRINCIPAL + "@" + KDC.getRealm(), HTTP_PRINCIPAL + "@" + KDC.getRealm());
HBaseKerberosUtils.setSSLConfiguration(UTIL, TestSaslFanOutOneBlockAsyncDFSOutput.class); HBaseKerberosUtils.setSSLConfiguration(UTIL, TestSaslFanOutOneBlockAsyncDFSOutput.class);
MONITOR = StreamSlowMonitor.create(UTIL.getConfiguration(), "testMonitor");
} }
@AfterClass @AfterClass
@ -254,7 +256,7 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase {
private void test(Path file) throws IOException, InterruptedException, ExecutionException { private void test(Path file) throws IOException, InterruptedException, ExecutionException {
EventLoop eventLoop = EVENT_LOOP_GROUP.next(); EventLoop eventLoop = EVENT_LOOP_GROUP.next();
FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, file, FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, file,
true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS); true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR);
TestFanOutOneBlockAsyncDFSOutput.writeAndVerify(FS, file, out); TestFanOutOneBlockAsyncDFSOutput.writeAndVerify(FS, file, out);
} }

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.crypto.Cipher; import org.apache.hadoop.hbase.io.crypto.Cipher;
import org.apache.hadoop.hbase.io.crypto.Encryption; import org.apache.hadoop.hbase.io.crypto.Encryption;
@ -47,7 +48,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer;
@ -169,7 +169,8 @@ public abstract class AbstractProtobufLogWriter {
} }
public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable, public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable,
long blocksize) throws IOException, StreamLacksCapabilityException { long blocksize, StreamSlowMonitor monitor) throws IOException,
StreamLacksCapabilityException {
this.conf = conf; this.conf = conf;
boolean doCompress = initializeCompressionContext(conf, path); boolean doCompress = initializeCompressionContext(conf, path);
this.trailerWarnSize = conf.getInt(WAL_TRAILER_WARN_SIZE, DEFAULT_WAL_TRAILER_WARN_SIZE); this.trailerWarnSize = conf.getInt(WAL_TRAILER_WARN_SIZE, DEFAULT_WAL_TRAILER_WARN_SIZE);
@ -177,7 +178,7 @@ public abstract class AbstractProtobufLogWriter {
short replication = (short) conf.getInt("hbase.regionserver.hlog.replication", short replication = (short) conf.getInt("hbase.regionserver.hlog.replication",
CommonFSUtils.getDefaultReplication(fs, path)); CommonFSUtils.getDefaultReplication(fs, path));
initOutput(fs, path, overwritable, bufferSize, replication, blocksize); initOutput(fs, path, overwritable, bufferSize, replication, blocksize, monitor);
boolean doTagCompress = doCompress && boolean doTagCompress = doCompress &&
conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true); conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true);
@ -266,7 +267,8 @@ public abstract class AbstractProtobufLogWriter {
} }
protected abstract void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize, protected abstract void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize,
short replication, long blockSize) throws IOException, StreamLacksCapabilityException; short replication, long blockSize, StreamSlowMonitor monitor)
throws IOException, StreamLacksCapabilityException;
/** /**
* return the file length after written. * return the file length after written.

View File

@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput; import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput;
import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
import org.apache.hadoop.hbase.wal.AsyncFSWALProvider; import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.hadoop.hbase.wal.WALKeyImpl;
@ -198,22 +199,26 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
private final int waitOnShutdownInSeconds; private final int waitOnShutdownInSeconds;
private final StreamSlowMonitor streamSlowMonitor;
public AsyncFSWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir, public AsyncFSWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir,
Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists, Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists,
String prefix, String suffix, EventLoopGroup eventLoopGroup, String prefix, String suffix, EventLoopGroup eventLoopGroup,
Class<? extends Channel> channelClass) throws FailedLogCloseException, IOException { Class<? extends Channel> channelClass) throws FailedLogCloseException, IOException {
this(fs, null, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix, this(fs, null, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix,
eventLoopGroup, channelClass); eventLoopGroup, channelClass, StreamSlowMonitor.create(conf, "monitorForSuffix"));
} }
public AsyncFSWAL(FileSystem fs, Abortable abortable, Path rootDir, String logDir, public AsyncFSWAL(FileSystem fs, Abortable abortable, Path rootDir, String logDir,
String archiveDir, Configuration conf, List<WALActionsListener> listeners, String archiveDir, Configuration conf, List<WALActionsListener> listeners,
boolean failIfWALExists, String prefix, String suffix, EventLoopGroup eventLoopGroup, boolean failIfWALExists, String prefix, String suffix, EventLoopGroup eventLoopGroup,
Class<? extends Channel> channelClass) throws FailedLogCloseException, IOException { Class<? extends Channel> channelClass, StreamSlowMonitor monitor)
throws FailedLogCloseException, IOException {
super(fs, abortable, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, super(fs, abortable, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix,
suffix); suffix);
this.eventLoopGroup = eventLoopGroup; this.eventLoopGroup = eventLoopGroup;
this.channelClass = channelClass; this.channelClass = channelClass;
this.streamSlowMonitor = monitor;
Supplier<Boolean> hasConsumerTask; Supplier<Boolean> hasConsumerTask;
if (conf.getBoolean(ASYNC_WAL_USE_SHARED_EVENT_LOOP, DEFAULT_ASYNC_WAL_USE_SHARED_EVENT_LOOP)) { if (conf.getBoolean(ASYNC_WAL_USE_SHARED_EVENT_LOOP, DEFAULT_ASYNC_WAL_USE_SHARED_EVENT_LOOP)) {
this.consumeExecutor = eventLoopGroup.next(); this.consumeExecutor = eventLoopGroup.next();
@ -655,8 +660,8 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
@Override @Override
protected AsyncWriter createWriterInstance(Path path) throws IOException { protected AsyncWriter createWriterInstance(Path path) throws IOException {
return AsyncFSWALProvider.createAsyncWriter(conf, fs, path, false, return AsyncFSWALProvider.createAsyncWriter(conf, fs, path, false, this.blocksize,
this.blocksize, eventLoopGroup, channelClass); eventLoopGroup, channelClass, streamSlowMonitor);
} }
private void waitForSafePoint() { private void waitForSafePoint() {

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.io.ByteBufferWriter; import org.apache.hadoop.hbase.io.ByteBufferWriter;
import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput; import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput;
import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutputHelper; import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutputHelper;
import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
import org.apache.hadoop.hbase.wal.AsyncFSWALProvider; import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WAL.Entry;
@ -176,9 +177,10 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
@Override @Override
protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize, protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize,
short replication, long blockSize) throws IOException, StreamLacksCapabilityException { short replication, long blockSize, StreamSlowMonitor monitor) throws IOException,
StreamLacksCapabilityException {
this.output = AsyncFSOutputHelper.createOutput(fs, path, overwritable, false, replication, this.output = AsyncFSOutputHelper.createOutput(fs, path, overwritable, false, replication,
blockSize, eventLoopGroup, channelClass); blockSize, eventLoopGroup, channelClass, monitor);
this.asyncOutputWrapper = new OutputStreamWrapper(output); this.asyncOutputWrapper = new OutputStreamWrapper(output);
} }

View File

@ -20,12 +20,12 @@ package org.apache.hadoop.hbase.regionserver.wal;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
import org.apache.hadoop.hbase.util.AtomicUtils; import org.apache.hadoop.hbase.util.AtomicUtils;
import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
@ -104,7 +104,8 @@ public class ProtobufLogWriter extends AbstractProtobufLogWriter
@Override @Override
protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize, protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize,
short replication, long blockSize) throws IOException, StreamLacksCapabilityException { short replication, long blockSize, StreamSlowMonitor monitor) throws IOException,
StreamLacksCapabilityException {
this.output = CommonFSUtils.createForWal(fs, path, overwritable, bufferSize, replication, this.output = CommonFSUtils.createForWal(fs, path, overwritable, bufferSize, replication,
blockSize, false); blockSize, false);
if (fs.getConf().getBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, true)) { if (fs.getConf().getBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, true)) {

View File

@ -25,6 +25,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutput; import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutput;
import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper; import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper;
import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper; import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper;
import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
import org.apache.hadoop.hbase.regionserver.wal.AsyncFSWAL; import org.apache.hadoop.hbase.regionserver.wal.AsyncFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.AsyncProtobufLogWriter; import org.apache.hadoop.hbase.regionserver.wal.AsyncProtobufLogWriter;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil; import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
@ -60,8 +61,8 @@ public class AsyncFSWALProvider extends AbstractFSWALProvider<AsyncFSWAL> {
* @throws StreamLacksCapabilityException if the given FileSystem can't provide streams that * @throws StreamLacksCapabilityException if the given FileSystem can't provide streams that
* meet the needs of the given Writer implementation. * meet the needs of the given Writer implementation.
*/ */
void init(FileSystem fs, Path path, Configuration c, boolean overwritable, long blocksize) void init(FileSystem fs, Path path, Configuration c, boolean overwritable, long blocksize,
throws IOException, CommonFSUtils.StreamLacksCapabilityException; StreamSlowMonitor monitor) throws IOException, CommonFSUtils.StreamLacksCapabilityException;
} }
private EventLoopGroup eventLoopGroup; private EventLoopGroup eventLoopGroup;
@ -70,10 +71,10 @@ public class AsyncFSWALProvider extends AbstractFSWALProvider<AsyncFSWAL> {
@Override @Override
protected AsyncFSWAL createWAL() throws IOException { protected AsyncFSWAL createWAL() throws IOException {
return new AsyncFSWAL(CommonFSUtils.getWALFileSystem(conf), this.abortable, return new AsyncFSWAL(CommonFSUtils.getWALFileSystem(conf), this.abortable,
CommonFSUtils.getWALRootDir(conf), getWALDirectoryName(factory.factoryId), CommonFSUtils.getWALRootDir(conf), getWALDirectoryName(factory.factoryId),
getWALArchiveDirectoryName(conf, factory.factoryId), conf, listeners, true, logPrefix, getWALArchiveDirectoryName(conf, factory.factoryId), conf, listeners, true, logPrefix,
META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null, eventLoopGroup, META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null, eventLoopGroup,
channelClass); channelClass, factory.getExcludeDatanodeManager().getStreamSlowMonitor(providerId));
} }
@Override @Override
@ -97,7 +98,7 @@ public class AsyncFSWALProvider extends AbstractFSWALProvider<AsyncFSWAL> {
boolean overwritable, EventLoopGroup eventLoopGroup, boolean overwritable, EventLoopGroup eventLoopGroup,
Class<? extends Channel> channelClass) throws IOException { Class<? extends Channel> channelClass) throws IOException {
return createAsyncWriter(conf, fs, path, overwritable, WALUtil.getWALBlockSize(conf, fs, path), return createAsyncWriter(conf, fs, path, overwritable, WALUtil.getWALBlockSize(conf, fs, path),
eventLoopGroup, channelClass); eventLoopGroup, channelClass, StreamSlowMonitor.create(conf, path.getName()));
} }
/** /**
@ -105,14 +106,14 @@ public class AsyncFSWALProvider extends AbstractFSWALProvider<AsyncFSWAL> {
*/ */
public static AsyncWriter createAsyncWriter(Configuration conf, FileSystem fs, Path path, public static AsyncWriter createAsyncWriter(Configuration conf, FileSystem fs, Path path,
boolean overwritable, long blocksize, EventLoopGroup eventLoopGroup, boolean overwritable, long blocksize, EventLoopGroup eventLoopGroup,
Class<? extends Channel> channelClass) throws IOException { Class<? extends Channel> channelClass, StreamSlowMonitor monitor) throws IOException {
// Configuration already does caching for the Class lookup. // Configuration already does caching for the Class lookup.
Class<? extends AsyncWriter> logWriterClass = conf.getClass( Class<? extends AsyncWriter> logWriterClass = conf.getClass(
WRITER_IMPL, AsyncProtobufLogWriter.class, AsyncWriter.class); WRITER_IMPL, AsyncProtobufLogWriter.class, AsyncWriter.class);
try { try {
AsyncWriter writer = logWriterClass.getConstructor(EventLoopGroup.class, Class.class) AsyncWriter writer = logWriterClass.getConstructor(EventLoopGroup.class, Class.class)
.newInstance(eventLoopGroup, channelClass); .newInstance(eventLoopGroup, channelClass);
writer.init(fs, path, conf, overwritable, blocksize); writer.init(fs, path, conf, overwritable, blocksize, monitor);
return writer; return writer;
} catch (Exception e) { } catch (Exception e) {
if (e instanceof CommonFSUtils.StreamLacksCapabilityException) { if (e instanceof CommonFSUtils.StreamLacksCapabilityException) {

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
import org.apache.hadoop.hbase.regionserver.wal.FSHLog; import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter; import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil; import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
@ -48,8 +49,8 @@ public class FSHLogProvider extends AbstractFSWALProvider<FSHLog> {
* @throws StreamLacksCapabilityException if the given FileSystem can't provide streams that * @throws StreamLacksCapabilityException if the given FileSystem can't provide streams that
* meet the needs of the given Writer implementation. * meet the needs of the given Writer implementation.
*/ */
void init(FileSystem fs, Path path, Configuration c, boolean overwritable, long blocksize) void init(FileSystem fs, Path path, Configuration c, boolean overwritable, long blocksize,
throws IOException, CommonFSUtils.StreamLacksCapabilityException; StreamSlowMonitor monitor) throws IOException, CommonFSUtils.StreamLacksCapabilityException;
} }
/** /**
@ -76,7 +77,8 @@ public class FSHLogProvider extends AbstractFSWALProvider<FSHLog> {
try { try {
writer = logWriterClass.getDeclaredConstructor().newInstance(); writer = logWriterClass.getDeclaredConstructor().newInstance();
FileSystem rootFs = FileSystem.get(path.toUri(), conf); FileSystem rootFs = FileSystem.get(path.toUri(), conf);
writer.init(rootFs, path, conf, overwritable, blocksize); writer.init(rootFs, path, conf, overwritable, blocksize,
StreamSlowMonitor.create(conf, path.getName()));
return writer; return writer;
} catch (Exception e) { } catch (Exception e) {
if (e instanceof CommonFSUtils.StreamLacksCapabilityException) { if (e instanceof CommonFSUtils.StreamLacksCapabilityException) {

View File

@ -21,12 +21,12 @@ import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager;
import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL; import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader; import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.CancelableProgressable;
@ -105,6 +105,8 @@ public class WALFactory {
private final Configuration conf; private final Configuration conf;
private final ExcludeDatanodeManager excludeDatanodeManager;
// Used for the singleton WALFactory, see below. // Used for the singleton WALFactory, see below.
private WALFactory(Configuration conf) { private WALFactory(Configuration conf) {
// this code is duplicated here so we can keep our members final. // this code is duplicated here so we can keep our members final.
@ -121,6 +123,7 @@ public class WALFactory {
provider = null; provider = null;
factoryId = SINGLETON_ID; factoryId = SINGLETON_ID;
this.abortable = null; this.abortable = null;
this.excludeDatanodeManager = new ExcludeDatanodeManager(conf);
} }
Providers getDefaultProvider() { Providers getDefaultProvider() {
@ -198,6 +201,7 @@ public class WALFactory {
AbstractFSWALProvider.Reader.class); AbstractFSWALProvider.Reader.class);
this.conf = conf; this.conf = conf;
this.factoryId = factoryId; this.factoryId = factoryId;
this.excludeDatanodeManager = new ExcludeDatanodeManager(conf);
this.abortable = abortable; this.abortable = abortable;
// end required early initialization // end required early initialization
if (conf.getBoolean(WAL_ENABLED, true)) { if (conf.getBoolean(WAL_ENABLED, true)) {
@ -499,4 +503,8 @@ public class WALFactory {
public final WALProvider getMetaWALProvider() { public final WALProvider getMetaWALProvider() {
return this.metaProvider.get(); return this.metaProvider.get();
} }
public ExcludeDatanodeManager getExcludeDatanodeManager() {
return excludeDatanodeManager;
}
} }

View File

@ -67,6 +67,9 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine; import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher; import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher;
@ -1233,7 +1236,8 @@ public abstract class AbstractTestWALReplay {
StreamLacksCapabilityException { StreamLacksCapabilityException {
fs.mkdirs(file.getParent()); fs.mkdirs(file.getParent());
ProtobufLogWriter writer = new ProtobufLogWriter(); ProtobufLogWriter writer = new ProtobufLogWriter();
writer.init(fs, file, conf, true, WALUtil.getWALBlockSize(conf, fs, file)); writer.init(fs, file, conf, true, WALUtil.getWALBlockSize(conf, fs, file),
StreamSlowMonitor.create(conf, "testMonitor"));
for (FSWALEntry entry : entries) { for (FSWALEntry entry : entries) {
writer.append(entry); writer.append(entry);
} }

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfo;
// imports for things that haven't moved from regionserver.wal yet. // imports for things that haven't moved from regionserver.wal yet.
import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
import org.apache.hadoop.hbase.regionserver.wal.FSHLog; import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter; import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
@ -214,7 +215,8 @@ public class IOTestProvider implements WALProvider {
LOG.info("creating new writer instance."); LOG.info("creating new writer instance.");
final ProtobufLogWriter writer = new IOTestWriter(); final ProtobufLogWriter writer = new IOTestWriter();
try { try {
writer.init(fs, path, conf, false, this.blocksize); writer.init(fs, path, conf, false, this.blocksize,
StreamSlowMonitor.create(conf, path.getName()));
} catch (CommonFSUtils.StreamLacksCapabilityException exception) { } catch (CommonFSUtils.StreamLacksCapabilityException exception) {
throw new IOException("Can't create writer instance because underlying FileSystem " + throw new IOException("Can't create writer instance because underlying FileSystem " +
"doesn't support needed stream capabilities.", exception); "doesn't support needed stream capabilities.", exception);
@ -242,7 +244,8 @@ public class IOTestProvider implements WALProvider {
@Override @Override
public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable, public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable,
long blocksize) throws IOException, CommonFSUtils.StreamLacksCapabilityException { long blocksize, StreamSlowMonitor monitor) throws IOException,
CommonFSUtils.StreamLacksCapabilityException {
Collection<String> operations = conf.getStringCollection(ALLOWED_OPERATIONS); Collection<String> operations = conf.getStringCollection(ALLOWED_OPERATIONS);
if (operations.isEmpty() || operations.contains(AllowedOperations.all.name())) { if (operations.isEmpty() || operations.contains(AllowedOperations.all.name())) {
doAppends = doSyncs = true; doAppends = doSyncs = true;
@ -254,7 +257,7 @@ public class IOTestProvider implements WALProvider {
} }
LOG.info("IOTestWriter initialized with appends " + (doAppends ? "enabled" : "disabled") + LOG.info("IOTestWriter initialized with appends " + (doAppends ? "enabled" : "disabled") +
" and syncs " + (doSyncs ? "enabled" : "disabled")); " and syncs " + (doSyncs ? "enabled" : "disabled"));
super.init(fs, path, conf, overwritable, blocksize); super.init(fs, path, conf, overwritable, blocksize, monitor);
} }
@Override @Override