HBASE-26347 Support detect and exclude slow DNs in fan-out of WAL (#3800)
Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
parent
8f0c2dabbb
commit
b948ddbf21
|
@ -23,6 +23,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.StreamCapabilities;
|
import org.apache.hadoop.fs.StreamCapabilities;
|
||||||
|
import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
|
||||||
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||||
import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
|
import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
|
||||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
|
@ -47,11 +48,11 @@ public final class AsyncFSOutputHelper {
|
||||||
*/
|
*/
|
||||||
public static AsyncFSOutput createOutput(FileSystem fs, Path f, boolean overwrite,
|
public static AsyncFSOutput createOutput(FileSystem fs, Path f, boolean overwrite,
|
||||||
boolean createParent, short replication, long blockSize, EventLoopGroup eventLoopGroup,
|
boolean createParent, short replication, long blockSize, EventLoopGroup eventLoopGroup,
|
||||||
Class<? extends Channel> channelClass)
|
Class<? extends Channel> channelClass, StreamSlowMonitor monitor)
|
||||||
throws IOException, CommonFSUtils.StreamLacksCapabilityException {
|
throws IOException, CommonFSUtils.StreamLacksCapabilityException {
|
||||||
if (fs instanceof DistributedFileSystem) {
|
if (fs instanceof DistributedFileSystem) {
|
||||||
return FanOutOneBlockAsyncDFSOutputHelper.createOutput((DistributedFileSystem) fs, f,
|
return FanOutOneBlockAsyncDFSOutputHelper.createOutput((DistributedFileSystem) fs, f,
|
||||||
overwrite, createParent, replication, blockSize, eventLoopGroup, channelClass);
|
overwrite, createParent, replication, blockSize, eventLoopGroup, channelClass, monitor);
|
||||||
}
|
}
|
||||||
final FSDataOutputStream out;
|
final FSDataOutputStream out;
|
||||||
int bufferSize = fs.getConf().getInt(CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
|
int bufferSize = fs.getConf().getInt(CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
|
||||||
|
|
|
@ -32,7 +32,7 @@ import java.nio.ByteBuffer;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
@ -44,7 +44,9 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.crypto.Encryptor;
|
import org.apache.hadoop.crypto.Encryptor;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.CancelOnClose;
|
import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.CancelOnClose;
|
||||||
|
import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
|
||||||
import org.apache.hadoop.hbase.util.CancelableProgressable;
|
import org.apache.hadoop.hbase.util.CancelableProgressable;
|
||||||
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
|
import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
|
||||||
import org.apache.hadoop.hdfs.DFSClient;
|
import org.apache.hadoop.hdfs.DFSClient;
|
||||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
|
@ -67,6 +69,7 @@ import org.apache.hbase.thirdparty.io.netty.channel.Channel;
|
||||||
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler.Sharable;
|
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler.Sharable;
|
||||||
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
|
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
|
||||||
import org.apache.hbase.thirdparty.io.netty.channel.ChannelId;
|
import org.apache.hbase.thirdparty.io.netty.channel.ChannelId;
|
||||||
|
import org.apache.hbase.thirdparty.io.netty.channel.ChannelOutboundInvoker;
|
||||||
import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
|
import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
|
||||||
import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
|
import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
|
||||||
import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent;
|
import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent;
|
||||||
|
@ -120,7 +123,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
|
||||||
|
|
||||||
private final Encryptor encryptor;
|
private final Encryptor encryptor;
|
||||||
|
|
||||||
private final List<Channel> datanodeList;
|
private final Map<Channel, DatanodeInfo> datanodeInfoMap;
|
||||||
|
|
||||||
private final DataChecksum summer;
|
private final DataChecksum summer;
|
||||||
|
|
||||||
|
@ -136,17 +139,22 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
|
||||||
|
|
||||||
// should be backed by a thread safe collection
|
// should be backed by a thread safe collection
|
||||||
private final Set<ChannelId> unfinishedReplicas;
|
private final Set<ChannelId> unfinishedReplicas;
|
||||||
|
private final long packetDataLen;
|
||||||
|
private final long flushTimestamp;
|
||||||
|
private long lastAckTimestamp = -1;
|
||||||
|
|
||||||
public Callback(CompletableFuture<Long> future, long ackedLength,
|
public Callback(CompletableFuture<Long> future, long ackedLength,
|
||||||
Collection<Channel> replicas) {
|
final Collection<Channel> replicas, long packetDataLen) {
|
||||||
this.future = future;
|
this.future = future;
|
||||||
this.ackedLength = ackedLength;
|
this.ackedLength = ackedLength;
|
||||||
|
this.packetDataLen = packetDataLen;
|
||||||
|
this.flushTimestamp = EnvironmentEdgeManager.currentTime();
|
||||||
if (replicas.isEmpty()) {
|
if (replicas.isEmpty()) {
|
||||||
this.unfinishedReplicas = Collections.emptySet();
|
this.unfinishedReplicas = Collections.emptySet();
|
||||||
} else {
|
} else {
|
||||||
this.unfinishedReplicas =
|
this.unfinishedReplicas =
|
||||||
Collections.newSetFromMap(new ConcurrentHashMap<ChannelId, Boolean>(replicas.size()));
|
Collections.newSetFromMap(new ConcurrentHashMap<ChannelId, Boolean>(replicas.size()));
|
||||||
replicas.stream().map(c -> c.id()).forEachOrdered(unfinishedReplicas::add);
|
replicas.stream().map(Channel::id).forEachOrdered(unfinishedReplicas::add);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -176,6 +184,8 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
|
||||||
|
|
||||||
private volatile State state;
|
private volatile State state;
|
||||||
|
|
||||||
|
private final StreamSlowMonitor streamSlowMonitor;
|
||||||
|
|
||||||
// all lock-free to make it run faster
|
// all lock-free to make it run faster
|
||||||
private void completed(Channel channel) {
|
private void completed(Channel channel) {
|
||||||
for (Iterator<Callback> iter = waitingAckQueue.iterator(); iter.hasNext();) {
|
for (Iterator<Callback> iter = waitingAckQueue.iterator(); iter.hasNext();) {
|
||||||
|
@ -183,6 +193,10 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
|
||||||
// if the current unfinished replicas does not contain us then it means that we have already
|
// if the current unfinished replicas does not contain us then it means that we have already
|
||||||
// acked this one, let's iterate to find the one we have not acked yet.
|
// acked this one, let's iterate to find the one we have not acked yet.
|
||||||
if (c.unfinishedReplicas.remove(channel.id())) {
|
if (c.unfinishedReplicas.remove(channel.id())) {
|
||||||
|
long current = EnvironmentEdgeManager.currentTime();
|
||||||
|
streamSlowMonitor.checkProcessTimeAndSpeed(datanodeInfoMap.get(channel), c.packetDataLen,
|
||||||
|
current - c.flushTimestamp, c.lastAckTimestamp, c.unfinishedReplicas.size());
|
||||||
|
c.lastAckTimestamp = current;
|
||||||
if (c.unfinishedReplicas.isEmpty()) {
|
if (c.unfinishedReplicas.isEmpty()) {
|
||||||
// we need to remove first before complete the future. It is possible that after we
|
// we need to remove first before complete the future. It is possible that after we
|
||||||
// complete the future the upper layer will call close immediately before we remove the
|
// complete the future the upper layer will call close immediately before we remove the
|
||||||
|
@ -245,7 +259,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
datanodeList.forEach(ch -> ch.close());
|
datanodeInfoMap.keySet().forEach(ChannelOutboundInvoker::close);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Sharable
|
@Sharable
|
||||||
|
@ -313,7 +327,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
|
||||||
|
|
||||||
private void setupReceiver(int timeoutMs) {
|
private void setupReceiver(int timeoutMs) {
|
||||||
AckHandler ackHandler = new AckHandler(timeoutMs);
|
AckHandler ackHandler = new AckHandler(timeoutMs);
|
||||||
for (Channel ch : datanodeList) {
|
for (Channel ch : datanodeInfoMap.keySet()) {
|
||||||
ch.pipeline().addLast(
|
ch.pipeline().addLast(
|
||||||
new IdleStateHandler(timeoutMs, timeoutMs / 2, 0, TimeUnit.MILLISECONDS),
|
new IdleStateHandler(timeoutMs, timeoutMs / 2, 0, TimeUnit.MILLISECONDS),
|
||||||
new ProtobufVarint32FrameDecoder(),
|
new ProtobufVarint32FrameDecoder(),
|
||||||
|
@ -324,8 +338,8 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
|
||||||
|
|
||||||
FanOutOneBlockAsyncDFSOutput(Configuration conf,DistributedFileSystem dfs,
|
FanOutOneBlockAsyncDFSOutput(Configuration conf,DistributedFileSystem dfs,
|
||||||
DFSClient client, ClientProtocol namenode, String clientName, String src, long fileId,
|
DFSClient client, ClientProtocol namenode, String clientName, String src, long fileId,
|
||||||
LocatedBlock locatedBlock, Encryptor encryptor, List<Channel> datanodeList,
|
LocatedBlock locatedBlock, Encryptor encryptor, Map<Channel, DatanodeInfo> datanodeInfoMap,
|
||||||
DataChecksum summer, ByteBufAllocator alloc) {
|
DataChecksum summer, ByteBufAllocator alloc, StreamSlowMonitor streamSlowMonitor) {
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
this.dfs = dfs;
|
this.dfs = dfs;
|
||||||
this.client = client;
|
this.client = client;
|
||||||
|
@ -336,13 +350,14 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
|
||||||
this.block = locatedBlock.getBlock();
|
this.block = locatedBlock.getBlock();
|
||||||
this.locations = locatedBlock.getLocations();
|
this.locations = locatedBlock.getLocations();
|
||||||
this.encryptor = encryptor;
|
this.encryptor = encryptor;
|
||||||
this.datanodeList = datanodeList;
|
this.datanodeInfoMap = datanodeInfoMap;
|
||||||
this.summer = summer;
|
this.summer = summer;
|
||||||
this.maxDataLen = MAX_DATA_LEN - (MAX_DATA_LEN % summer.getBytesPerChecksum());
|
this.maxDataLen = MAX_DATA_LEN - (MAX_DATA_LEN % summer.getBytesPerChecksum());
|
||||||
this.alloc = alloc;
|
this.alloc = alloc;
|
||||||
this.buf = alloc.directBuffer(sendBufSizePRedictor.initialSize());
|
this.buf = alloc.directBuffer(sendBufSizePRedictor.initialSize());
|
||||||
this.state = State.STREAMING;
|
this.state = State.STREAMING;
|
||||||
setupReceiver(conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT));
|
setupReceiver(conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT));
|
||||||
|
this.streamSlowMonitor = streamSlowMonitor;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -394,7 +409,8 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
|
||||||
ByteBuf headerBuf = alloc.buffer(headerLen);
|
ByteBuf headerBuf = alloc.buffer(headerLen);
|
||||||
header.putInBuffer(headerBuf.nioBuffer(0, headerLen));
|
header.putInBuffer(headerBuf.nioBuffer(0, headerLen));
|
||||||
headerBuf.writerIndex(headerLen);
|
headerBuf.writerIndex(headerLen);
|
||||||
Callback c = new Callback(future, nextPacketOffsetInBlock + dataLen, datanodeList);
|
Callback c = new Callback(future, nextPacketOffsetInBlock + dataLen,
|
||||||
|
datanodeInfoMap.keySet(), dataLen);
|
||||||
waitingAckQueue.addLast(c);
|
waitingAckQueue.addLast(c);
|
||||||
// recheck again after we pushed the callback to queue
|
// recheck again after we pushed the callback to queue
|
||||||
if (state != State.STREAMING && waitingAckQueue.peekFirst() == c) {
|
if (state != State.STREAMING && waitingAckQueue.peekFirst() == c) {
|
||||||
|
@ -405,7 +421,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
|
||||||
}
|
}
|
||||||
// TODO: we should perhaps measure time taken per DN here;
|
// TODO: we should perhaps measure time taken per DN here;
|
||||||
// we could collect statistics per DN, and/or exclude bad nodes in createOutput.
|
// we could collect statistics per DN, and/or exclude bad nodes in createOutput.
|
||||||
datanodeList.forEach(ch -> {
|
datanodeInfoMap.keySet().forEach(ch -> {
|
||||||
ch.write(headerBuf.retainedDuplicate());
|
ch.write(headerBuf.retainedDuplicate());
|
||||||
ch.write(checksumBuf.retainedDuplicate());
|
ch.write(checksumBuf.retainedDuplicate());
|
||||||
ch.writeAndFlush(dataBuf.retainedDuplicate());
|
ch.writeAndFlush(dataBuf.retainedDuplicate());
|
||||||
|
@ -427,7 +443,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
|
||||||
long lengthAfterFlush = nextPacketOffsetInBlock + dataLen;
|
long lengthAfterFlush = nextPacketOffsetInBlock + dataLen;
|
||||||
Callback lastFlush = waitingAckQueue.peekLast();
|
Callback lastFlush = waitingAckQueue.peekLast();
|
||||||
if (lastFlush != null) {
|
if (lastFlush != null) {
|
||||||
Callback c = new Callback(future, lengthAfterFlush, Collections.emptyList());
|
Callback c = new Callback(future, lengthAfterFlush, Collections.emptySet(), dataLen);
|
||||||
waitingAckQueue.addLast(c);
|
waitingAckQueue.addLast(c);
|
||||||
// recheck here if we have already removed the previous callback from the queue
|
// recheck here if we have already removed the previous callback from the queue
|
||||||
if (waitingAckQueue.peekFirst() == c) {
|
if (waitingAckQueue.peekFirst() == c) {
|
||||||
|
@ -527,8 +543,8 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
|
||||||
header.putInBuffer(headerBuf.nioBuffer(0, headerLen));
|
header.putInBuffer(headerBuf.nioBuffer(0, headerLen));
|
||||||
headerBuf.writerIndex(headerLen);
|
headerBuf.writerIndex(headerLen);
|
||||||
CompletableFuture<Long> future = new CompletableFuture<>();
|
CompletableFuture<Long> future = new CompletableFuture<>();
|
||||||
waitingAckQueue.add(new Callback(future, finalizedLength, datanodeList));
|
waitingAckQueue.add(new Callback(future, finalizedLength, datanodeInfoMap.keySet(), 0));
|
||||||
datanodeList.forEach(ch -> ch.writeAndFlush(headerBuf.retainedDuplicate()));
|
datanodeInfoMap.keySet().forEach(ch -> ch.writeAndFlush(headerBuf.retainedDuplicate()));
|
||||||
headerBuf.release();
|
headerBuf.release();
|
||||||
try {
|
try {
|
||||||
future.get();
|
future.get();
|
||||||
|
@ -545,13 +561,14 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
|
||||||
* The close method when error occurred. Now we just call recoverFileLease.
|
* The close method when error occurred. Now we just call recoverFileLease.
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
|
@SuppressWarnings("FutureReturnValueIgnored")
|
||||||
public void recoverAndClose(CancelableProgressable reporter) throws IOException {
|
public void recoverAndClose(CancelableProgressable reporter) throws IOException {
|
||||||
if (buf != null) {
|
if (buf != null) {
|
||||||
buf.release();
|
buf.release();
|
||||||
buf = null;
|
buf = null;
|
||||||
}
|
}
|
||||||
datanodeList.forEach(ch -> ch.close());
|
datanodeInfoMap.keySet().forEach(ChannelOutboundInvoker::close);
|
||||||
datanodeList.forEach(ch -> ch.closeFuture().awaitUninterruptibly());
|
datanodeInfoMap.keySet().forEach(ch -> ch.closeFuture().awaitUninterruptibly());
|
||||||
endFileLease(client, fileId);
|
endFileLease(client, fileId);
|
||||||
RecoverLeaseFSUtils.recoverFileLease(dfs, new Path(src), conf,
|
RecoverLeaseFSUtils.recoverFileLease(dfs, new Path(src), conf,
|
||||||
reporter == null ? new CancelOnClose(client) : reporter);
|
reporter == null ? new CancelOnClose(client) : reporter);
|
||||||
|
@ -562,11 +579,12 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
|
||||||
* {@link #recoverAndClose(CancelableProgressable)} if this method throws an exception.
|
* {@link #recoverAndClose(CancelableProgressable)} if this method throws an exception.
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
|
@SuppressWarnings("FutureReturnValueIgnored")
|
||||||
public void close() throws IOException {
|
public void close() throws IOException {
|
||||||
endBlock();
|
endBlock();
|
||||||
state = State.CLOSED;
|
state = State.CLOSED;
|
||||||
datanodeList.forEach(ch -> ch.close());
|
datanodeInfoMap.keySet().forEach(ChannelOutboundInvoker::close);
|
||||||
datanodeList.forEach(ch -> ch.closeFuture().awaitUninterruptibly());
|
datanodeInfoMap.keySet().forEach(ch -> ch.closeFuture().awaitUninterruptibly());
|
||||||
block.setNumBytes(ackedBlockLength);
|
block.setNumBytes(ackedBlockLength);
|
||||||
completeFile(client, namenode, src, clientName, block, fileId);
|
completeFile(client, namenode, src, clientName, block, fileId);
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,9 +33,12 @@ import java.lang.reflect.InvocationTargetException;
|
||||||
import java.lang.reflect.Method;
|
import java.lang.reflect.Method;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.IdentityHashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import org.apache.commons.lang3.ArrayUtils;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.crypto.CryptoProtocolVersion;
|
import org.apache.hadoop.crypto.CryptoProtocolVersion;
|
||||||
import org.apache.hadoop.crypto.Encryptor;
|
import org.apache.hadoop.crypto.Encryptor;
|
||||||
|
@ -47,6 +50,8 @@ import org.apache.hadoop.fs.StorageType;
|
||||||
import org.apache.hadoop.fs.UnresolvedLinkException;
|
import org.apache.hadoop.fs.UnresolvedLinkException;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.hbase.client.ConnectionUtils;
|
import org.apache.hadoop.hbase.client.ConnectionUtils;
|
||||||
|
import org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager;
|
||||||
|
import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
|
||||||
import org.apache.hadoop.hbase.util.CancelableProgressable;
|
import org.apache.hadoop.hbase.util.CancelableProgressable;
|
||||||
import org.apache.hadoop.hdfs.DFSClient;
|
import org.apache.hadoop.hdfs.DFSClient;
|
||||||
import org.apache.hadoop.hdfs.DFSOutputStream;
|
import org.apache.hadoop.hdfs.DFSOutputStream;
|
||||||
|
@ -128,8 +133,6 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
|
||||||
// Timeouts for communicating with DataNode for streaming writes/reads
|
// Timeouts for communicating with DataNode for streaming writes/reads
|
||||||
public static final int READ_TIMEOUT = 60 * 1000;
|
public static final int READ_TIMEOUT = 60 * 1000;
|
||||||
|
|
||||||
private static final DatanodeInfo[] EMPTY_DN_ARRAY = new DatanodeInfo[0];
|
|
||||||
|
|
||||||
private interface LeaseManager {
|
private interface LeaseManager {
|
||||||
|
|
||||||
void begin(DFSClient client, long inodeId);
|
void begin(DFSClient client, long inodeId);
|
||||||
|
@ -451,15 +454,20 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
|
||||||
|
|
||||||
private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, String src,
|
private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, String src,
|
||||||
boolean overwrite, boolean createParent, short replication, long blockSize,
|
boolean overwrite, boolean createParent, short replication, long blockSize,
|
||||||
EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) throws IOException {
|
EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass,
|
||||||
|
StreamSlowMonitor monitor) throws IOException {
|
||||||
Configuration conf = dfs.getConf();
|
Configuration conf = dfs.getConf();
|
||||||
DFSClient client = dfs.getClient();
|
DFSClient client = dfs.getClient();
|
||||||
String clientName = client.getClientName();
|
String clientName = client.getClientName();
|
||||||
ClientProtocol namenode = client.getNamenode();
|
ClientProtocol namenode = client.getNamenode();
|
||||||
int createMaxRetries = conf.getInt(ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES,
|
int createMaxRetries = conf.getInt(ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES,
|
||||||
DEFAULT_ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES);
|
DEFAULT_ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES);
|
||||||
DatanodeInfo[] excludesNodes = EMPTY_DN_ARRAY;
|
ExcludeDatanodeManager excludeDatanodeManager = monitor.getExcludeDatanodeManager();
|
||||||
|
Set<DatanodeInfo> toExcludeNodes =
|
||||||
|
new HashSet<>(excludeDatanodeManager.getExcludeDNs().keySet());
|
||||||
for (int retry = 0;; retry++) {
|
for (int retry = 0;; retry++) {
|
||||||
|
LOG.debug("When create output stream for {}, exclude list is {}, retry={}", src,
|
||||||
|
toExcludeNodes, retry);
|
||||||
HdfsFileStatus stat;
|
HdfsFileStatus stat;
|
||||||
try {
|
try {
|
||||||
stat = FILE_CREATOR.create(namenode, src,
|
stat = FILE_CREATOR.create(namenode, src,
|
||||||
|
@ -479,24 +487,26 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
|
||||||
List<Future<Channel>> futureList = null;
|
List<Future<Channel>> futureList = null;
|
||||||
try {
|
try {
|
||||||
DataChecksum summer = createChecksum(client);
|
DataChecksum summer = createChecksum(client);
|
||||||
locatedBlock = namenode.addBlock(src, client.getClientName(), null, excludesNodes,
|
locatedBlock = namenode.addBlock(src, client.getClientName(), null,
|
||||||
stat.getFileId(), null, null);
|
toExcludeNodes.toArray(new DatanodeInfo[0]), stat.getFileId(), null, null);
|
||||||
List<Channel> datanodeList = new ArrayList<>();
|
Map<Channel, DatanodeInfo> datanodes = new IdentityHashMap<>();
|
||||||
futureList = connectToDataNodes(conf, client, clientName, locatedBlock, 0L, 0L,
|
futureList = connectToDataNodes(conf, client, clientName, locatedBlock, 0L, 0L,
|
||||||
PIPELINE_SETUP_CREATE, summer, eventLoopGroup, channelClass);
|
PIPELINE_SETUP_CREATE, summer, eventLoopGroup, channelClass);
|
||||||
for (int i = 0, n = futureList.size(); i < n; i++) {
|
for (int i = 0, n = futureList.size(); i < n; i++) {
|
||||||
|
DatanodeInfo datanodeInfo = locatedBlock.getLocations()[i];
|
||||||
try {
|
try {
|
||||||
datanodeList.add(futureList.get(i).syncUninterruptibly().getNow());
|
datanodes.put(futureList.get(i).syncUninterruptibly().getNow(), datanodeInfo);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
// exclude the broken DN next time
|
// exclude the broken DN next time
|
||||||
excludesNodes = ArrayUtils.add(excludesNodes, locatedBlock.getLocations()[i]);
|
toExcludeNodes.add(datanodeInfo);
|
||||||
|
excludeDatanodeManager.tryAddExcludeDN(datanodeInfo, "connect error");
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Encryptor encryptor = createEncryptor(conf, stat, client);
|
Encryptor encryptor = createEncryptor(conf, stat, client);
|
||||||
FanOutOneBlockAsyncDFSOutput output =
|
FanOutOneBlockAsyncDFSOutput output =
|
||||||
new FanOutOneBlockAsyncDFSOutput(conf, dfs, client, namenode, clientName, src,
|
new FanOutOneBlockAsyncDFSOutput(conf, dfs, client, namenode, clientName, src,
|
||||||
stat.getFileId(), locatedBlock, encryptor, datanodeList, summer, ALLOC);
|
stat.getFileId(), locatedBlock, encryptor, datanodes, summer, ALLOC, monitor);
|
||||||
succ = true;
|
succ = true;
|
||||||
return output;
|
return output;
|
||||||
} catch (RemoteException e) {
|
} catch (RemoteException e) {
|
||||||
|
@ -547,14 +557,15 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
|
||||||
*/
|
*/
|
||||||
public static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, Path f,
|
public static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, Path f,
|
||||||
boolean overwrite, boolean createParent, short replication, long blockSize,
|
boolean overwrite, boolean createParent, short replication, long blockSize,
|
||||||
EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) throws IOException {
|
EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass,
|
||||||
|
final StreamSlowMonitor monitor) throws IOException {
|
||||||
return new FileSystemLinkResolver<FanOutOneBlockAsyncDFSOutput>() {
|
return new FileSystemLinkResolver<FanOutOneBlockAsyncDFSOutput>() {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public FanOutOneBlockAsyncDFSOutput doCall(Path p)
|
public FanOutOneBlockAsyncDFSOutput doCall(Path p)
|
||||||
throws IOException, UnresolvedLinkException {
|
throws IOException, UnresolvedLinkException {
|
||||||
return createOutput(dfs, p.toUri().getPath(), overwrite, createParent, replication,
|
return createOutput(dfs, p.toUri().getPath(), overwrite, createParent, replication,
|
||||||
blockSize, eventLoopGroup, channelClass);
|
blockSize, eventLoopGroup, channelClass, monitor);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -0,0 +1,114 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.io.asyncfs.monitor;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
|
||||||
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The class to manage the excluded datanodes of the WALs on the regionserver.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class ExcludeDatanodeManager implements ConfigurationObserver {
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(ExcludeDatanodeManager.class);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Configure for the max count the excluded datanodes.
|
||||||
|
*/
|
||||||
|
public static final String WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT_KEY =
|
||||||
|
"hbase.regionserver.async.wal.max.exclude.datanode.count";
|
||||||
|
public static final int DEFAULT_WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT = 3;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Configure for the TTL time of the datanodes excluded
|
||||||
|
*/
|
||||||
|
public static final String WAL_EXCLUDE_DATANODE_TTL_KEY =
|
||||||
|
"hbase.regionserver.async.wal.exclude.datanode.info.ttl.hour";
|
||||||
|
public static final int DEFAULT_WAL_EXCLUDE_DATANODE_TTL = 6; // 6 hours
|
||||||
|
|
||||||
|
private volatile Cache<DatanodeInfo, Long> excludeDNsCache;
|
||||||
|
private final int maxExcludeDNCount;
|
||||||
|
private final Configuration conf;
|
||||||
|
// This is a map of providerId->StreamSlowMonitor
|
||||||
|
private final Map<String, StreamSlowMonitor> streamSlowMonitors =
|
||||||
|
new ConcurrentHashMap<>(1);
|
||||||
|
|
||||||
|
public ExcludeDatanodeManager(Configuration conf) {
|
||||||
|
this.conf = conf;
|
||||||
|
this.maxExcludeDNCount = conf.getInt(WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT_KEY,
|
||||||
|
DEFAULT_WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT);
|
||||||
|
this.excludeDNsCache = CacheBuilder.newBuilder()
|
||||||
|
.expireAfterWrite(this.conf.getLong(WAL_EXCLUDE_DATANODE_TTL_KEY,
|
||||||
|
DEFAULT_WAL_EXCLUDE_DATANODE_TTL), TimeUnit.HOURS)
|
||||||
|
.maximumSize(this.maxExcludeDNCount)
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Try to add a datanode to the regionserver excluding cache
|
||||||
|
* @param datanodeInfo the datanode to be added to the excluded cache
|
||||||
|
* @param cause the cause that the datanode is hope to be excluded
|
||||||
|
* @return True if the datanode is added to the regionserver excluding cache, false otherwise
|
||||||
|
*/
|
||||||
|
public boolean tryAddExcludeDN(DatanodeInfo datanodeInfo, String cause) {
|
||||||
|
boolean alreadyMarkedSlow = getExcludeDNs().containsKey(datanodeInfo);
|
||||||
|
if (!alreadyMarkedSlow) {
|
||||||
|
excludeDNsCache.put(datanodeInfo, EnvironmentEdgeManager.currentTime());
|
||||||
|
LOG.info(
|
||||||
|
"Added datanode: {} to exclude cache by [{}] success, current excludeDNsCache size={}",
|
||||||
|
datanodeInfo, cause, excludeDNsCache.size());
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
LOG.debug("Try add datanode {} to exclude cache by [{}] failed, "
|
||||||
|
+ "current exclude DNs are {}", datanodeInfo, cause, getExcludeDNs().keySet());
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
public StreamSlowMonitor getStreamSlowMonitor(String name) {
|
||||||
|
String key = name == null || name.isEmpty() ? "defaultMonitorName" : name;
|
||||||
|
return streamSlowMonitors
|
||||||
|
.computeIfAbsent(key, k -> new StreamSlowMonitor(conf, key, this));
|
||||||
|
}
|
||||||
|
|
||||||
|
public Map<DatanodeInfo, Long> getExcludeDNs() {
|
||||||
|
return excludeDNsCache.asMap();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onConfigurationChange(Configuration conf) {
|
||||||
|
for (StreamSlowMonitor monitor : streamSlowMonitors.values()) {
|
||||||
|
monitor.onConfigurationChange(conf);
|
||||||
|
}
|
||||||
|
this.excludeDNsCache = CacheBuilder.newBuilder().expireAfterWrite(
|
||||||
|
this.conf.getLong(WAL_EXCLUDE_DATANODE_TTL_KEY, DEFAULT_WAL_EXCLUDE_DATANODE_TTL),
|
||||||
|
TimeUnit.HOURS).maximumSize(this.conf
|
||||||
|
.getInt(WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT_KEY, DEFAULT_WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT))
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,217 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.io.asyncfs.monitor;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager.DEFAULT_WAL_EXCLUDE_DATANODE_TTL;
|
||||||
|
import static org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager.DEFAULT_WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT;
|
||||||
|
import static org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager.WAL_EXCLUDE_DATANODE_TTL_KEY;
|
||||||
|
import static org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager.WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT_KEY;
|
||||||
|
|
||||||
|
import java.util.Deque;
|
||||||
|
import java.util.concurrent.ConcurrentLinkedDeque;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
|
||||||
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Class for monitor the wal file flush performance.
|
||||||
|
* Each active wal file has a StreamSlowMonitor.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class StreamSlowMonitor implements ConfigurationObserver {
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(StreamSlowMonitor.class);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Configure for the min count for a datanode detected slow.
|
||||||
|
* If a datanode is detected slow times up to this count, then it will be added to the exclude
|
||||||
|
* datanode cache by {@link ExcludeDatanodeManager#tryAddExcludeDN(DatanodeInfo, String)}
|
||||||
|
* of this regionsever.
|
||||||
|
*/
|
||||||
|
private static final String WAL_SLOW_DETECT_MIN_COUNT_KEY =
|
||||||
|
"hbase.regionserver.async.wal.min.slow.detect.count";
|
||||||
|
private static final int DEFAULT_WAL_SLOW_DETECT_MIN_COUNT = 3;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Configure for the TTL of the data that a datanode detected slow.
|
||||||
|
*/
|
||||||
|
private static final String WAL_SLOW_DETECT_DATA_TTL_KEY =
|
||||||
|
"hbase.regionserver.async.wal.slow.detect.data.ttl.ms";
|
||||||
|
private static final long DEFAULT_WAL_SLOW_DETECT_DATA_TTL = 10 * 60 * 1000; // 10min in ms
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Configure for the speed check of packet min length.
|
||||||
|
* For packets whose data length smaller than this value, check slow by processing time.
|
||||||
|
* While for packets whose data length larger than this value, check slow by flushing speed.
|
||||||
|
*/
|
||||||
|
private static final String DATANODE_PACKET_FLUSH_CHECK_SPEED_MIN_DATA_LENGTH_KEY =
|
||||||
|
"hbase.regionserver.async.wal.datanode.slow.check.speed.packet.data.length.min";
|
||||||
|
private static final long DEFAULT_DATANODE_PACKET_FLUSH_CHECK_SPEED_MIN_DATA_LENGTH =
|
||||||
|
64 * 1024; //64KB
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Configure for the slow packet process time, a duration from send to ACK.
|
||||||
|
* The processing time check is for packets that data length smaller than
|
||||||
|
* {@link StreamSlowMonitor#DATANODE_PACKET_FLUSH_CHECK_SPEED_MIN_DATA_LENGTH_KEY}
|
||||||
|
*/
|
||||||
|
public static final String DATANODE_SLOW_PACKET_PROCESS_TIME_KEY =
|
||||||
|
"hbase.regionserver.async.wal.datanode.slow.packet.process.time.millis";
|
||||||
|
private static final long DEFAULT_DATANODE_SLOW_PACKET_PROCESS_TIME = 6000; // 6s in ms
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Configure for the check of large packet(which is configured by
|
||||||
|
* {@link StreamSlowMonitor#DATANODE_PACKET_FLUSH_CHECK_SPEED_MIN_DATA_LENGTH_KEY}) flush speed.
|
||||||
|
* e.g. If the configured slow packet process time is smaller than 10s, then here 20KB/s means
|
||||||
|
* 64KB should be processed in less than 3.2s.
|
||||||
|
*/
|
||||||
|
private static final String DATANODE_SLOW_PACKET_FLUSH_MIN_SPEED_KEY =
|
||||||
|
"hbase.regionserver.async.wal.datanode.slow.packet.speed.min.kbs";
|
||||||
|
private static final double DEFAULT_DATANODE_SLOW_PACKET_FLUSH_MIN_SPEED = 20; // 20KB/s
|
||||||
|
|
||||||
|
private final String name;
|
||||||
|
// this is a map of datanodeInfo->queued slow PacketAckData
|
||||||
|
private final LoadingCache<DatanodeInfo, Deque<PacketAckData>> datanodeSlowDataQueue;
|
||||||
|
private final ExcludeDatanodeManager excludeDatanodeManager;
|
||||||
|
|
||||||
|
private int minSlowDetectCount;
|
||||||
|
private long slowDataTtl;
|
||||||
|
private long slowPacketAckMs;
|
||||||
|
private double minPacketFlushSpeedKBs;
|
||||||
|
private long minLengthForSpeedCheck;
|
||||||
|
|
||||||
|
public StreamSlowMonitor(Configuration conf, String name,
|
||||||
|
ExcludeDatanodeManager excludeDatanodeManager) {
|
||||||
|
setConf(conf);
|
||||||
|
this.name = name;
|
||||||
|
this.excludeDatanodeManager = excludeDatanodeManager;
|
||||||
|
this.datanodeSlowDataQueue = CacheBuilder.newBuilder()
|
||||||
|
.maximumSize(conf.getInt(WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT_KEY,
|
||||||
|
DEFAULT_WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT))
|
||||||
|
.expireAfterWrite(conf.getLong(WAL_EXCLUDE_DATANODE_TTL_KEY,
|
||||||
|
DEFAULT_WAL_EXCLUDE_DATANODE_TTL), TimeUnit.HOURS)
|
||||||
|
.build(new CacheLoader<DatanodeInfo, Deque<PacketAckData>>() {
|
||||||
|
@Override
|
||||||
|
public Deque<PacketAckData> load(DatanodeInfo key) throws Exception {
|
||||||
|
return new ConcurrentLinkedDeque<>();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
LOG.info("New stream slow monitor {}", this.name);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static StreamSlowMonitor create(Configuration conf, String name) {
|
||||||
|
return new StreamSlowMonitor(conf, name, new ExcludeDatanodeManager(conf));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if the packet process time shows that the relevant datanode is a slow node.
|
||||||
|
* @param datanodeInfo the datanode that processed the packet
|
||||||
|
* @param packetDataLen the data length of the packet (in bytes)
|
||||||
|
* @param processTimeMs the process time (in ms) of the packet on the datanode,
|
||||||
|
* @param lastAckTimestamp the last acked timestamp of the packet on another datanode
|
||||||
|
* @param unfinished if the packet is unfinished flushed to the datanode replicas
|
||||||
|
*/
|
||||||
|
public void checkProcessTimeAndSpeed(DatanodeInfo datanodeInfo, long packetDataLen,
|
||||||
|
long processTimeMs, long lastAckTimestamp, int unfinished) {
|
||||||
|
long current = EnvironmentEdgeManager.currentTime();
|
||||||
|
// Here are two conditions used to determine whether a datanode is slow,
|
||||||
|
// 1. For small packet, we just have a simple time limit, without considering
|
||||||
|
// the size of the packet.
|
||||||
|
// 2. For large packet, we will calculate the speed, and check if the speed is too slow.
|
||||||
|
boolean slow = (packetDataLen <= minLengthForSpeedCheck && processTimeMs > slowPacketAckMs) || (
|
||||||
|
packetDataLen > minLengthForSpeedCheck
|
||||||
|
&& (double) packetDataLen / processTimeMs < minPacketFlushSpeedKBs);
|
||||||
|
if (slow) {
|
||||||
|
// Check if large diff ack timestamp between replicas,
|
||||||
|
// should try to avoid misjudgments that caused by GC STW.
|
||||||
|
if ((lastAckTimestamp > 0 && current - lastAckTimestamp > slowPacketAckMs / 2) || (
|
||||||
|
lastAckTimestamp <= 0 && unfinished == 0)) {
|
||||||
|
LOG.info("Slow datanode: {}, data length={}, duration={}ms, unfinishedReplicas={}, "
|
||||||
|
+ "lastAckTimestamp={}, monitor name: {}", datanodeInfo, packetDataLen, processTimeMs,
|
||||||
|
unfinished, lastAckTimestamp, this.name);
|
||||||
|
if (addSlowAckData(datanodeInfo, packetDataLen, processTimeMs)) {
|
||||||
|
excludeDatanodeManager.tryAddExcludeDN(datanodeInfo, "slow packet ack");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onConfigurationChange(Configuration conf) {
|
||||||
|
setConf(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean addSlowAckData(DatanodeInfo datanodeInfo, long dataLength, long processTime) {
|
||||||
|
Deque<PacketAckData> slowDNQueue = datanodeSlowDataQueue.getUnchecked(datanodeInfo);
|
||||||
|
long current = EnvironmentEdgeManager.currentTime();
|
||||||
|
while (!slowDNQueue.isEmpty() && (current - slowDNQueue.getFirst().getTimestamp() > slowDataTtl
|
||||||
|
|| slowDNQueue.size() >= minSlowDetectCount)) {
|
||||||
|
slowDNQueue.removeFirst();
|
||||||
|
}
|
||||||
|
slowDNQueue.addLast(new PacketAckData(dataLength, processTime));
|
||||||
|
return slowDNQueue.size() >= minSlowDetectCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void setConf(Configuration conf) {
|
||||||
|
this.minSlowDetectCount = conf.getInt(WAL_SLOW_DETECT_MIN_COUNT_KEY,
|
||||||
|
DEFAULT_WAL_SLOW_DETECT_MIN_COUNT);
|
||||||
|
this.slowDataTtl = conf.getLong(WAL_SLOW_DETECT_DATA_TTL_KEY, DEFAULT_WAL_SLOW_DETECT_DATA_TTL);
|
||||||
|
this.slowPacketAckMs = conf.getLong(DATANODE_SLOW_PACKET_PROCESS_TIME_KEY,
|
||||||
|
DEFAULT_DATANODE_SLOW_PACKET_PROCESS_TIME);
|
||||||
|
this.minLengthForSpeedCheck = conf.getLong(
|
||||||
|
DATANODE_PACKET_FLUSH_CHECK_SPEED_MIN_DATA_LENGTH_KEY,
|
||||||
|
DEFAULT_DATANODE_PACKET_FLUSH_CHECK_SPEED_MIN_DATA_LENGTH);
|
||||||
|
this.minPacketFlushSpeedKBs = conf.getDouble(DATANODE_SLOW_PACKET_FLUSH_MIN_SPEED_KEY,
|
||||||
|
DEFAULT_DATANODE_SLOW_PACKET_FLUSH_MIN_SPEED);
|
||||||
|
}
|
||||||
|
|
||||||
|
public ExcludeDatanodeManager getExcludeDatanodeManager() {
|
||||||
|
return excludeDatanodeManager;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class PacketAckData {
|
||||||
|
private final long dataLength;
|
||||||
|
private final long processTime;
|
||||||
|
private final long timestamp;
|
||||||
|
|
||||||
|
public PacketAckData(long dataLength, long processTime) {
|
||||||
|
this.dataLength = dataLength;
|
||||||
|
this.processTime = processTime;
|
||||||
|
this.timestamp = EnvironmentEdgeManager.currentTime();
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getDataLength() {
|
||||||
|
return dataLength;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getProcessTime() {
|
||||||
|
return processTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getTimestamp() {
|
||||||
|
return timestamp;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,87 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.io.asyncfs;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
|
import org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager;
|
||||||
|
import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
@Category({ SmallTests.class })
|
||||||
|
public class TestExcludeDatanodeManager {
|
||||||
|
|
||||||
|
@ClassRule
|
||||||
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
|
HBaseClassTestRule.forClass(TestExcludeDatanodeManager.class);
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testExcludeSlowDNBySpeed() {
|
||||||
|
Configuration conf = HBaseConfiguration.create();
|
||||||
|
ExcludeDatanodeManager excludeDatanodeManager = new ExcludeDatanodeManager(conf);
|
||||||
|
StreamSlowMonitor streamSlowDNsMonitor =
|
||||||
|
excludeDatanodeManager.getStreamSlowMonitor("testMonitor");
|
||||||
|
assertEquals(0, excludeDatanodeManager.getExcludeDNs().size());
|
||||||
|
DatanodeInfo datanodeInfo =
|
||||||
|
new DatanodeInfo.DatanodeInfoBuilder().setIpAddr("0.0.0.0").setHostName("hostname1")
|
||||||
|
.setDatanodeUuid("uuid1").setXferPort(111).setInfoPort(222).setInfoSecurePort(333)
|
||||||
|
.setIpcPort(444).setNetworkLocation("location1").build();
|
||||||
|
streamSlowDNsMonitor
|
||||||
|
.checkProcessTimeAndSpeed(datanodeInfo, 100000, 5100,
|
||||||
|
System.currentTimeMillis() - 5100, 0);
|
||||||
|
streamSlowDNsMonitor
|
||||||
|
.checkProcessTimeAndSpeed(datanodeInfo, 100000, 5100,
|
||||||
|
System.currentTimeMillis() - 5100, 0);
|
||||||
|
streamSlowDNsMonitor
|
||||||
|
.checkProcessTimeAndSpeed(datanodeInfo, 100000, 5100,
|
||||||
|
System.currentTimeMillis() - 5100, 0);
|
||||||
|
assertEquals(1, excludeDatanodeManager.getExcludeDNs().size());
|
||||||
|
assertTrue(excludeDatanodeManager.getExcludeDNs().containsKey(datanodeInfo));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testExcludeSlowDNByProcessTime() {
|
||||||
|
Configuration conf = HBaseConfiguration.create();
|
||||||
|
ExcludeDatanodeManager excludeDatanodeManager = new ExcludeDatanodeManager(conf);
|
||||||
|
StreamSlowMonitor streamSlowDNsMonitor =
|
||||||
|
excludeDatanodeManager.getStreamSlowMonitor("testMonitor");
|
||||||
|
assertEquals(0, excludeDatanodeManager.getExcludeDNs().size());
|
||||||
|
DatanodeInfo datanodeInfo =
|
||||||
|
new DatanodeInfo.DatanodeInfoBuilder().setIpAddr("0.0.0.0").setHostName("hostname1")
|
||||||
|
.setDatanodeUuid("uuid1").setXferPort(111).setInfoPort(222).setInfoSecurePort(333)
|
||||||
|
.setIpcPort(444).setNetworkLocation("location1").build();
|
||||||
|
streamSlowDNsMonitor
|
||||||
|
.checkProcessTimeAndSpeed(datanodeInfo, 5000, 7000,
|
||||||
|
System.currentTimeMillis() - 7000, 0);
|
||||||
|
streamSlowDNsMonitor
|
||||||
|
.checkProcessTimeAndSpeed(datanodeInfo, 5000, 7000,
|
||||||
|
System.currentTimeMillis() - 7000, 0);
|
||||||
|
streamSlowDNsMonitor
|
||||||
|
.checkProcessTimeAndSpeed(datanodeInfo, 5000, 7000,
|
||||||
|
System.currentTimeMillis() - 7000, 0);
|
||||||
|
assertEquals(1, excludeDatanodeManager.getExcludeDNs().size());
|
||||||
|
assertTrue(excludeDatanodeManager.getExcludeDNs().containsKey(datanodeInfo));
|
||||||
|
}
|
||||||
|
}
|
|
@ -39,6 +39,9 @@ import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
|
import org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager;
|
||||||
|
import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.MiscTests;
|
import org.apache.hadoop.hbase.testclassification.MiscTests;
|
||||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
|
@ -78,6 +81,8 @@ public class TestFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase {
|
||||||
|
|
||||||
private static int READ_TIMEOUT_MS = 2000;
|
private static int READ_TIMEOUT_MS = 2000;
|
||||||
|
|
||||||
|
private static StreamSlowMonitor MONITOR;
|
||||||
|
|
||||||
@Rule
|
@Rule
|
||||||
public TestName name = new TestName();
|
public TestName name = new TestName();
|
||||||
|
|
||||||
|
@ -88,6 +93,7 @@ public class TestFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase {
|
||||||
FS = CLUSTER.getFileSystem();
|
FS = CLUSTER.getFileSystem();
|
||||||
EVENT_LOOP_GROUP = new NioEventLoopGroup();
|
EVENT_LOOP_GROUP = new NioEventLoopGroup();
|
||||||
CHANNEL_CLASS = NioSocketChannel.class;
|
CHANNEL_CLASS = NioSocketChannel.class;
|
||||||
|
MONITOR = StreamSlowMonitor.create(UTIL.getConfiguration(), "testMonitor");
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterClass
|
@AfterClass
|
||||||
|
@ -133,7 +139,7 @@ public class TestFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase {
|
||||||
Path f = new Path("/" + name.getMethodName());
|
Path f = new Path("/" + name.getMethodName());
|
||||||
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
|
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
|
||||||
FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
|
FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
|
||||||
false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS);
|
false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR);
|
||||||
writeAndVerify(FS, f, out);
|
writeAndVerify(FS, f, out);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -142,7 +148,7 @@ public class TestFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase {
|
||||||
Path f = new Path("/" + name.getMethodName());
|
Path f = new Path("/" + name.getMethodName());
|
||||||
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
|
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
|
||||||
FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
|
FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
|
||||||
false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS);
|
false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR);
|
||||||
byte[] b = new byte[10];
|
byte[] b = new byte[10];
|
||||||
ThreadLocalRandom.current().nextBytes(b);
|
ThreadLocalRandom.current().nextBytes(b);
|
||||||
out.write(b, 0, b.length);
|
out.write(b, 0, b.length);
|
||||||
|
@ -171,7 +177,7 @@ public class TestFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase {
|
||||||
Path f = new Path("/" + name.getMethodName());
|
Path f = new Path("/" + name.getMethodName());
|
||||||
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
|
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
|
||||||
FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
|
FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
|
||||||
false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS);
|
false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR);
|
||||||
Thread.sleep(READ_TIMEOUT_MS * 2);
|
Thread.sleep(READ_TIMEOUT_MS * 2);
|
||||||
// the connection to datanode should still alive.
|
// the connection to datanode should still alive.
|
||||||
writeAndVerify(FS, f, out);
|
writeAndVerify(FS, f, out);
|
||||||
|
@ -186,7 +192,7 @@ public class TestFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase {
|
||||||
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
|
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
|
||||||
try {
|
try {
|
||||||
FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3,
|
FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3,
|
||||||
FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS);
|
FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR);
|
||||||
fail("should fail with parent does not exist");
|
fail("should fail with parent does not exist");
|
||||||
} catch (RemoteException e) {
|
} catch (RemoteException e) {
|
||||||
LOG.info("expected exception caught", e);
|
LOG.info("expected exception caught", e);
|
||||||
|
@ -209,7 +215,7 @@ public class TestFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase {
|
||||||
Path f = new Path("/test");
|
Path f = new Path("/test");
|
||||||
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
|
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
|
||||||
try (FanOutOneBlockAsyncDFSOutput output = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS,
|
try (FanOutOneBlockAsyncDFSOutput output = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS,
|
||||||
f, true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS)) {
|
f, true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR)) {
|
||||||
// should exclude the dead dn when retry so here we only have 2 DNs in pipeline
|
// should exclude the dead dn when retry so here we only have 2 DNs in pipeline
|
||||||
assertEquals(2, output.getPipeline().length);
|
assertEquals(2, output.getPipeline().length);
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -217,12 +223,42 @@ public class TestFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testExcludeFailedConnectToDatanode()
|
||||||
|
throws IOException, ClassNotFoundException, NoSuchMethodException, IllegalAccessException,
|
||||||
|
InvocationTargetException, InterruptedException, NoSuchFieldException {
|
||||||
|
Field xceiverServerDaemonField = DataNode.class.getDeclaredField("dataXceiverServer");
|
||||||
|
xceiverServerDaemonField.setAccessible(true);
|
||||||
|
Class<?> xceiverServerClass =
|
||||||
|
Class.forName("org.apache.hadoop.hdfs.server.datanode.DataXceiverServer");
|
||||||
|
Method numPeersMethod = xceiverServerClass.getDeclaredMethod("getNumPeers");
|
||||||
|
numPeersMethod.setAccessible(true);
|
||||||
|
// make one datanode broken
|
||||||
|
DataNodeProperties dnProp = CLUSTER.stopDataNode(0);
|
||||||
|
Path f = new Path("/test");
|
||||||
|
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
|
||||||
|
ExcludeDatanodeManager excludeDatanodeManager =
|
||||||
|
new ExcludeDatanodeManager(HBaseConfiguration.create());
|
||||||
|
StreamSlowMonitor streamSlowDNsMonitor =
|
||||||
|
excludeDatanodeManager.getStreamSlowMonitor("testMonitor");
|
||||||
|
assertEquals(0, excludeDatanodeManager.getExcludeDNs().size());
|
||||||
|
try (FanOutOneBlockAsyncDFSOutput output = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS,
|
||||||
|
f, true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop,
|
||||||
|
CHANNEL_CLASS, streamSlowDNsMonitor)) {
|
||||||
|
// should exclude the dead dn when retry so here we only have 2 DNs in pipeline
|
||||||
|
assertEquals(2, output.getPipeline().length);
|
||||||
|
assertEquals(1, excludeDatanodeManager.getExcludeDNs().size());
|
||||||
|
} finally {
|
||||||
|
CLUSTER.restartDataNode(dnProp);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testWriteLargeChunk() throws IOException, InterruptedException, ExecutionException {
|
public void testWriteLargeChunk() throws IOException, InterruptedException, ExecutionException {
|
||||||
Path f = new Path("/" + name.getMethodName());
|
Path f = new Path("/" + name.getMethodName());
|
||||||
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
|
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
|
||||||
FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
|
FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
|
||||||
false, (short) 3, 1024 * 1024 * 1024, eventLoop, CHANNEL_CLASS);
|
false, (short) 3, 1024 * 1024 * 1024, eventLoop, CHANNEL_CLASS, MONITOR);
|
||||||
byte[] b = new byte[50 * 1024 * 1024];
|
byte[] b = new byte[50 * 1024 * 1024];
|
||||||
ThreadLocalRandom.current().nextBytes(b);
|
ThreadLocalRandom.current().nextBytes(b);
|
||||||
out.write(b);
|
out.write(b);
|
||||||
|
|
|
@ -23,6 +23,7 @@ import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
|
import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
|
||||||
|
import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
|
||||||
import org.apache.hadoop.hbase.testclassification.MiscTests;
|
import org.apache.hadoop.hbase.testclassification.MiscTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||||
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||||
|
@ -49,10 +50,13 @@ public class TestLocalAsyncOutput {
|
||||||
|
|
||||||
private static final HBaseCommonTestingUtil TEST_UTIL = new HBaseCommonTestingUtil();
|
private static final HBaseCommonTestingUtil TEST_UTIL = new HBaseCommonTestingUtil();
|
||||||
|
|
||||||
|
private static StreamSlowMonitor MONITOR;
|
||||||
|
|
||||||
@AfterClass
|
@AfterClass
|
||||||
public static void tearDownAfterClass() throws IOException {
|
public static void tearDownAfterClass() throws IOException {
|
||||||
TEST_UTIL.cleanupTestDir();
|
TEST_UTIL.cleanupTestDir();
|
||||||
GROUP.shutdownGracefully();
|
GROUP.shutdownGracefully();
|
||||||
|
MONITOR = StreamSlowMonitor.create(TEST_UTIL.getConfiguration(), "testMonitor");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -61,7 +65,7 @@ public class TestLocalAsyncOutput {
|
||||||
Path f = new Path(TEST_UTIL.getDataTestDir(), "test");
|
Path f = new Path(TEST_UTIL.getDataTestDir(), "test");
|
||||||
FileSystem fs = FileSystem.getLocal(TEST_UTIL.getConfiguration());
|
FileSystem fs = FileSystem.getLocal(TEST_UTIL.getConfiguration());
|
||||||
AsyncFSOutput out = AsyncFSOutputHelper.createOutput(fs, f, false, true,
|
AsyncFSOutput out = AsyncFSOutputHelper.createOutput(fs, f, false, true,
|
||||||
fs.getDefaultReplication(f), fs.getDefaultBlockSize(f), GROUP, CHANNEL_CLASS);
|
fs.getDefaultReplication(f), fs.getDefaultBlockSize(f), GROUP, CHANNEL_CLASS, MONITOR);
|
||||||
TestFanOutOneBlockAsyncDFSOutput.writeAndVerify(fs, f, out);
|
TestFanOutOneBlockAsyncDFSOutput.writeAndVerify(fs, f, out);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,7 +24,6 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.lang.reflect.Method;
|
|
||||||
import java.net.BindException;
|
import java.net.BindException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
@ -40,6 +39,7 @@ import org.apache.hadoop.crypto.key.KeyProvider;
|
||||||
import org.apache.hadoop.crypto.key.KeyProviderFactory;
|
import org.apache.hadoop.crypto.key.KeyProviderFactory;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
|
||||||
import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
|
import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
|
||||||
import org.apache.hadoop.hbase.security.SecurityConstants;
|
import org.apache.hadoop.hbase.security.SecurityConstants;
|
||||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
|
@ -102,6 +102,8 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase {
|
||||||
|
|
||||||
private static String TEST_KEY_NAME = "test_key";
|
private static String TEST_KEY_NAME = "test_key";
|
||||||
|
|
||||||
|
private static StreamSlowMonitor MONITOR;
|
||||||
|
|
||||||
@Rule
|
@Rule
|
||||||
public TestName name = new TestName();
|
public TestName name = new TestName();
|
||||||
|
|
||||||
|
@ -187,6 +189,7 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase {
|
||||||
HBaseKerberosUtils.setSecuredConfiguration(UTIL.getConfiguration(),
|
HBaseKerberosUtils.setSecuredConfiguration(UTIL.getConfiguration(),
|
||||||
PRINCIPAL + "@" + KDC.getRealm(), HTTP_PRINCIPAL + "@" + KDC.getRealm());
|
PRINCIPAL + "@" + KDC.getRealm(), HTTP_PRINCIPAL + "@" + KDC.getRealm());
|
||||||
HBaseKerberosUtils.setSSLConfiguration(UTIL, TestSaslFanOutOneBlockAsyncDFSOutput.class);
|
HBaseKerberosUtils.setSSLConfiguration(UTIL, TestSaslFanOutOneBlockAsyncDFSOutput.class);
|
||||||
|
MONITOR = StreamSlowMonitor.create(UTIL.getConfiguration(), "testMonitor");
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterClass
|
@AfterClass
|
||||||
|
@ -252,7 +255,7 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase {
|
||||||
private void test(Path file) throws IOException, InterruptedException, ExecutionException {
|
private void test(Path file) throws IOException, InterruptedException, ExecutionException {
|
||||||
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
|
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
|
||||||
FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, file,
|
FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, file,
|
||||||
true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS);
|
true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR);
|
||||||
TestFanOutOneBlockAsyncDFSOutput.writeAndVerify(FS, file, out);
|
TestFanOutOneBlockAsyncDFSOutput.writeAndVerify(FS, file, out);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.codec.Codec;
|
import org.apache.hadoop.hbase.codec.Codec;
|
||||||
|
import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
|
||||||
import org.apache.hadoop.hbase.io.compress.Compression;
|
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||||
import org.apache.hadoop.hbase.io.crypto.Cipher;
|
import org.apache.hadoop.hbase.io.crypto.Cipher;
|
||||||
import org.apache.hadoop.hbase.io.crypto.Encryption;
|
import org.apache.hadoop.hbase.io.crypto.Encryption;
|
||||||
|
@ -47,7 +48,6 @@ import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
|
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer;
|
||||||
|
|
||||||
|
@ -169,7 +169,8 @@ public abstract class AbstractProtobufLogWriter {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable,
|
public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable,
|
||||||
long blocksize) throws IOException, StreamLacksCapabilityException {
|
long blocksize, StreamSlowMonitor monitor) throws IOException,
|
||||||
|
StreamLacksCapabilityException {
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
boolean doCompress = initializeCompressionContext(conf, path);
|
boolean doCompress = initializeCompressionContext(conf, path);
|
||||||
this.trailerWarnSize = conf.getInt(WAL_TRAILER_WARN_SIZE, DEFAULT_WAL_TRAILER_WARN_SIZE);
|
this.trailerWarnSize = conf.getInt(WAL_TRAILER_WARN_SIZE, DEFAULT_WAL_TRAILER_WARN_SIZE);
|
||||||
|
@ -177,7 +178,7 @@ public abstract class AbstractProtobufLogWriter {
|
||||||
short replication = (short) conf.getInt("hbase.regionserver.hlog.replication",
|
short replication = (short) conf.getInt("hbase.regionserver.hlog.replication",
|
||||||
CommonFSUtils.getDefaultReplication(fs, path));
|
CommonFSUtils.getDefaultReplication(fs, path));
|
||||||
|
|
||||||
initOutput(fs, path, overwritable, bufferSize, replication, blocksize);
|
initOutput(fs, path, overwritable, bufferSize, replication, blocksize, monitor);
|
||||||
|
|
||||||
boolean doTagCompress = doCompress &&
|
boolean doTagCompress = doCompress &&
|
||||||
conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true);
|
conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true);
|
||||||
|
@ -266,7 +267,8 @@ public abstract class AbstractProtobufLogWriter {
|
||||||
}
|
}
|
||||||
|
|
||||||
protected abstract void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize,
|
protected abstract void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize,
|
||||||
short replication, long blockSize) throws IOException, StreamLacksCapabilityException;
|
short replication, long blockSize, StreamSlowMonitor monitor)
|
||||||
|
throws IOException, StreamLacksCapabilityException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* return the file length after written.
|
* return the file length after written.
|
||||||
|
|
|
@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.Abortable;
|
||||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput;
|
import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput;
|
||||||
|
import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
|
||||||
import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
|
import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
|
||||||
import org.apache.hadoop.hbase.wal.WALEdit;
|
import org.apache.hadoop.hbase.wal.WALEdit;
|
||||||
import org.apache.hadoop.hbase.wal.WALKeyImpl;
|
import org.apache.hadoop.hbase.wal.WALKeyImpl;
|
||||||
|
@ -200,22 +201,26 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
||||||
|
|
||||||
private final int waitOnShutdownInSeconds;
|
private final int waitOnShutdownInSeconds;
|
||||||
|
|
||||||
|
private final StreamSlowMonitor streamSlowMonitor;
|
||||||
|
|
||||||
public AsyncFSWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir,
|
public AsyncFSWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir,
|
||||||
Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists,
|
Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists,
|
||||||
String prefix, String suffix, EventLoopGroup eventLoopGroup,
|
String prefix, String suffix, EventLoopGroup eventLoopGroup,
|
||||||
Class<? extends Channel> channelClass) throws FailedLogCloseException, IOException {
|
Class<? extends Channel> channelClass) throws FailedLogCloseException, IOException {
|
||||||
this(fs, null, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix,
|
this(fs, null, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix,
|
||||||
eventLoopGroup, channelClass);
|
eventLoopGroup, channelClass, StreamSlowMonitor.create(conf, "monitorForSuffix"));
|
||||||
}
|
}
|
||||||
|
|
||||||
public AsyncFSWAL(FileSystem fs, Abortable abortable, Path rootDir, String logDir,
|
public AsyncFSWAL(FileSystem fs, Abortable abortable, Path rootDir, String logDir,
|
||||||
String archiveDir, Configuration conf, List<WALActionsListener> listeners,
|
String archiveDir, Configuration conf, List<WALActionsListener> listeners,
|
||||||
boolean failIfWALExists, String prefix, String suffix, EventLoopGroup eventLoopGroup,
|
boolean failIfWALExists, String prefix, String suffix, EventLoopGroup eventLoopGroup,
|
||||||
Class<? extends Channel> channelClass) throws FailedLogCloseException, IOException {
|
Class<? extends Channel> channelClass, StreamSlowMonitor monitor)
|
||||||
|
throws FailedLogCloseException, IOException {
|
||||||
super(fs, abortable, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix,
|
super(fs, abortable, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix,
|
||||||
suffix);
|
suffix);
|
||||||
this.eventLoopGroup = eventLoopGroup;
|
this.eventLoopGroup = eventLoopGroup;
|
||||||
this.channelClass = channelClass;
|
this.channelClass = channelClass;
|
||||||
|
this.streamSlowMonitor = monitor;
|
||||||
Supplier<Boolean> hasConsumerTask;
|
Supplier<Boolean> hasConsumerTask;
|
||||||
if (conf.getBoolean(ASYNC_WAL_USE_SHARED_EVENT_LOOP, DEFAULT_ASYNC_WAL_USE_SHARED_EVENT_LOOP)) {
|
if (conf.getBoolean(ASYNC_WAL_USE_SHARED_EVENT_LOOP, DEFAULT_ASYNC_WAL_USE_SHARED_EVENT_LOOP)) {
|
||||||
this.consumeExecutor = eventLoopGroup.next();
|
this.consumeExecutor = eventLoopGroup.next();
|
||||||
|
@ -712,7 +717,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
||||||
|
|
||||||
protected final AsyncWriter createAsyncWriter(FileSystem fs, Path path) throws IOException {
|
protected final AsyncWriter createAsyncWriter(FileSystem fs, Path path) throws IOException {
|
||||||
return AsyncFSWALProvider.createAsyncWriter(conf, fs, path, false, this.blocksize,
|
return AsyncFSWALProvider.createAsyncWriter(conf, fs, path, false, this.blocksize,
|
||||||
eventLoopGroup, channelClass);
|
eventLoopGroup, channelClass, streamSlowMonitor);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.Cell;
|
||||||
import org.apache.hadoop.hbase.io.ByteBufferWriter;
|
import org.apache.hadoop.hbase.io.ByteBufferWriter;
|
||||||
import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput;
|
import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput;
|
||||||
import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutputHelper;
|
import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutputHelper;
|
||||||
|
import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
|
||||||
import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
|
import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
|
||||||
import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
|
import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
|
||||||
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
||||||
|
@ -176,9 +177,10 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize,
|
protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize,
|
||||||
short replication, long blockSize) throws IOException, StreamLacksCapabilityException {
|
short replication, long blockSize, StreamSlowMonitor monitor) throws IOException,
|
||||||
|
StreamLacksCapabilityException {
|
||||||
this.output = AsyncFSOutputHelper.createOutput(fs, path, overwritable, false, replication,
|
this.output = AsyncFSOutputHelper.createOutput(fs, path, overwritable, false, replication,
|
||||||
blockSize, eventLoopGroup, channelClass);
|
blockSize, eventLoopGroup, channelClass, monitor);
|
||||||
this.asyncOutputWrapper = new OutputStreamWrapper(output);
|
this.asyncOutputWrapper = new OutputStreamWrapper(output);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -20,13 +20,13 @@ package org.apache.hadoop.hbase.regionserver.wal;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStreamBuilder;
|
import org.apache.hadoop.fs.FSDataOutputStreamBuilder;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.StreamCapabilities;
|
import org.apache.hadoop.fs.StreamCapabilities;
|
||||||
import org.apache.hadoop.hbase.Cell;
|
import org.apache.hadoop.hbase.Cell;
|
||||||
|
import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
|
||||||
import org.apache.hadoop.hbase.util.AtomicUtils;
|
import org.apache.hadoop.hbase.util.AtomicUtils;
|
||||||
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||||
import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
|
import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
|
||||||
|
@ -106,7 +106,8 @@ public class ProtobufLogWriter extends AbstractProtobufLogWriter
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize,
|
protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize,
|
||||||
short replication, long blockSize) throws IOException, StreamLacksCapabilityException {
|
short replication, long blockSize, StreamSlowMonitor monitor) throws IOException,
|
||||||
|
StreamLacksCapabilityException {
|
||||||
FSDataOutputStreamBuilder<?, ?> builder = fs
|
FSDataOutputStreamBuilder<?, ?> builder = fs
|
||||||
.createFile(path)
|
.createFile(path)
|
||||||
.overwrite(overwritable)
|
.overwrite(overwritable)
|
||||||
|
|
|
@ -24,6 +24,7 @@ import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutput;
|
import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutput;
|
||||||
import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper;
|
import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper;
|
||||||
import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper;
|
import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper;
|
||||||
|
import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.AsyncFSWAL;
|
import org.apache.hadoop.hbase.regionserver.wal.AsyncFSWAL;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.AsyncProtobufLogWriter;
|
import org.apache.hadoop.hbase.regionserver.wal.AsyncProtobufLogWriter;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
|
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
|
||||||
|
@ -57,8 +58,8 @@ public class AsyncFSWALProvider extends AbstractFSWALProvider<AsyncFSWAL> {
|
||||||
* @throws StreamLacksCapabilityException if the given FileSystem can't provide streams that
|
* @throws StreamLacksCapabilityException if the given FileSystem can't provide streams that
|
||||||
* meet the needs of the given Writer implementation.
|
* meet the needs of the given Writer implementation.
|
||||||
*/
|
*/
|
||||||
void init(FileSystem fs, Path path, Configuration c, boolean overwritable, long blocksize)
|
void init(FileSystem fs, Path path, Configuration c, boolean overwritable, long blocksize,
|
||||||
throws IOException, CommonFSUtils.StreamLacksCapabilityException;
|
StreamSlowMonitor monitor) throws IOException, CommonFSUtils.StreamLacksCapabilityException;
|
||||||
}
|
}
|
||||||
|
|
||||||
private EventLoopGroup eventLoopGroup;
|
private EventLoopGroup eventLoopGroup;
|
||||||
|
@ -68,10 +69,10 @@ public class AsyncFSWALProvider extends AbstractFSWALProvider<AsyncFSWAL> {
|
||||||
@Override
|
@Override
|
||||||
protected AsyncFSWAL createWAL() throws IOException {
|
protected AsyncFSWAL createWAL() throws IOException {
|
||||||
return new AsyncFSWAL(CommonFSUtils.getWALFileSystem(conf), this.abortable,
|
return new AsyncFSWAL(CommonFSUtils.getWALFileSystem(conf), this.abortable,
|
||||||
CommonFSUtils.getWALRootDir(conf), getWALDirectoryName(factory.factoryId),
|
CommonFSUtils.getWALRootDir(conf), getWALDirectoryName(factory.factoryId),
|
||||||
getWALArchiveDirectoryName(conf, factory.factoryId), conf, listeners, true, logPrefix,
|
getWALArchiveDirectoryName(conf, factory.factoryId), conf, listeners, true, logPrefix,
|
||||||
META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null, eventLoopGroup,
|
META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null, eventLoopGroup,
|
||||||
channelClass);
|
channelClass, factory.getExcludeDatanodeManager().getStreamSlowMonitor(providerId));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -89,7 +90,7 @@ public class AsyncFSWALProvider extends AbstractFSWALProvider<AsyncFSWAL> {
|
||||||
boolean overwritable, EventLoopGroup eventLoopGroup,
|
boolean overwritable, EventLoopGroup eventLoopGroup,
|
||||||
Class<? extends Channel> channelClass) throws IOException {
|
Class<? extends Channel> channelClass) throws IOException {
|
||||||
return createAsyncWriter(conf, fs, path, overwritable, WALUtil.getWALBlockSize(conf, fs, path),
|
return createAsyncWriter(conf, fs, path, overwritable, WALUtil.getWALBlockSize(conf, fs, path),
|
||||||
eventLoopGroup, channelClass);
|
eventLoopGroup, channelClass, StreamSlowMonitor.create(conf, path.getName()));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -97,14 +98,14 @@ public class AsyncFSWALProvider extends AbstractFSWALProvider<AsyncFSWAL> {
|
||||||
*/
|
*/
|
||||||
public static AsyncWriter createAsyncWriter(Configuration conf, FileSystem fs, Path path,
|
public static AsyncWriter createAsyncWriter(Configuration conf, FileSystem fs, Path path,
|
||||||
boolean overwritable, long blocksize, EventLoopGroup eventLoopGroup,
|
boolean overwritable, long blocksize, EventLoopGroup eventLoopGroup,
|
||||||
Class<? extends Channel> channelClass) throws IOException {
|
Class<? extends Channel> channelClass, StreamSlowMonitor monitor) throws IOException {
|
||||||
// Configuration already does caching for the Class lookup.
|
// Configuration already does caching for the Class lookup.
|
||||||
Class<? extends AsyncWriter> logWriterClass = conf.getClass(
|
Class<? extends AsyncWriter> logWriterClass = conf.getClass(
|
||||||
WRITER_IMPL, AsyncProtobufLogWriter.class, AsyncWriter.class);
|
WRITER_IMPL, AsyncProtobufLogWriter.class, AsyncWriter.class);
|
||||||
try {
|
try {
|
||||||
AsyncWriter writer = logWriterClass.getConstructor(EventLoopGroup.class, Class.class)
|
AsyncWriter writer = logWriterClass.getConstructor(EventLoopGroup.class, Class.class)
|
||||||
.newInstance(eventLoopGroup, channelClass);
|
.newInstance(eventLoopGroup, channelClass);
|
||||||
writer.init(fs, path, conf, overwritable, blocksize);
|
writer.init(fs, path, conf, overwritable, blocksize, monitor);
|
||||||
return writer;
|
return writer;
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
if (e instanceof CommonFSUtils.StreamLacksCapabilityException) {
|
if (e instanceof CommonFSUtils.StreamLacksCapabilityException) {
|
||||||
|
|
|
@ -22,6 +22,7 @@ import java.io.IOException;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
|
import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter;
|
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
|
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
|
||||||
|
@ -48,8 +49,8 @@ public class FSHLogProvider extends AbstractFSWALProvider<FSHLog> {
|
||||||
* @throws StreamLacksCapabilityException if the given FileSystem can't provide streams that
|
* @throws StreamLacksCapabilityException if the given FileSystem can't provide streams that
|
||||||
* meet the needs of the given Writer implementation.
|
* meet the needs of the given Writer implementation.
|
||||||
*/
|
*/
|
||||||
void init(FileSystem fs, Path path, Configuration c, boolean overwritable, long blocksize)
|
void init(FileSystem fs, Path path, Configuration c, boolean overwritable, long blocksize,
|
||||||
throws IOException, CommonFSUtils.StreamLacksCapabilityException;
|
StreamSlowMonitor monitor) throws IOException, CommonFSUtils.StreamLacksCapabilityException;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -76,7 +77,8 @@ public class FSHLogProvider extends AbstractFSWALProvider<FSHLog> {
|
||||||
try {
|
try {
|
||||||
writer = logWriterClass.getDeclaredConstructor().newInstance();
|
writer = logWriterClass.getDeclaredConstructor().newInstance();
|
||||||
FileSystem rootFs = FileSystem.get(path.toUri(), conf);
|
FileSystem rootFs = FileSystem.get(path.toUri(), conf);
|
||||||
writer.init(rootFs, path, conf, overwritable, blocksize);
|
writer.init(rootFs, path, conf, overwritable, blocksize,
|
||||||
|
StreamSlowMonitor.create(conf, path.getName()));
|
||||||
return writer;
|
return writer;
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
if (e instanceof CommonFSUtils.StreamLacksCapabilityException) {
|
if (e instanceof CommonFSUtils.StreamLacksCapabilityException) {
|
||||||
|
|
|
@ -21,12 +21,12 @@ import java.io.IOException;
|
||||||
import java.io.InterruptedIOException;
|
import java.io.InterruptedIOException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.Abortable;
|
import org.apache.hadoop.hbase.Abortable;
|
||||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
|
import org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
|
import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
|
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
|
||||||
import org.apache.hadoop.hbase.util.CancelableProgressable;
|
import org.apache.hadoop.hbase.util.CancelableProgressable;
|
||||||
|
@ -105,6 +105,8 @@ public class WALFactory {
|
||||||
|
|
||||||
private final Configuration conf;
|
private final Configuration conf;
|
||||||
|
|
||||||
|
private final ExcludeDatanodeManager excludeDatanodeManager;
|
||||||
|
|
||||||
// Used for the singleton WALFactory, see below.
|
// Used for the singleton WALFactory, see below.
|
||||||
private WALFactory(Configuration conf) {
|
private WALFactory(Configuration conf) {
|
||||||
// this code is duplicated here so we can keep our members final.
|
// this code is duplicated here so we can keep our members final.
|
||||||
|
@ -121,6 +123,7 @@ public class WALFactory {
|
||||||
provider = null;
|
provider = null;
|
||||||
factoryId = SINGLETON_ID;
|
factoryId = SINGLETON_ID;
|
||||||
this.abortable = null;
|
this.abortable = null;
|
||||||
|
this.excludeDatanodeManager = new ExcludeDatanodeManager(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
Providers getDefaultProvider() {
|
Providers getDefaultProvider() {
|
||||||
|
@ -197,6 +200,7 @@ public class WALFactory {
|
||||||
AbstractFSWALProvider.Reader.class);
|
AbstractFSWALProvider.Reader.class);
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
this.factoryId = factoryId;
|
this.factoryId = factoryId;
|
||||||
|
this.excludeDatanodeManager = new ExcludeDatanodeManager(conf);
|
||||||
this.abortable = abortable;
|
this.abortable = abortable;
|
||||||
// end required early initialization
|
// end required early initialization
|
||||||
if (conf.getBoolean(WAL_ENABLED, true)) {
|
if (conf.getBoolean(WAL_ENABLED, true)) {
|
||||||
|
@ -502,4 +506,8 @@ public class WALFactory {
|
||||||
public final WALProvider getMetaWALProvider() {
|
public final WALProvider getMetaWALProvider() {
|
||||||
return this.metaProvider.get();
|
return this.metaProvider.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public ExcludeDatanodeManager getExcludeDatanodeManager() {
|
||||||
|
return excludeDatanodeManager;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -70,6 +70,7 @@ import org.apache.hadoop.hbase.client.Scan;
|
||||||
import org.apache.hadoop.hbase.client.Table;
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||||
|
import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
|
||||||
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
||||||
import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
|
import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
|
||||||
import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher;
|
import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher;
|
||||||
|
@ -1179,7 +1180,8 @@ public abstract class AbstractTestWALReplay {
|
||||||
throws IOException, StreamLacksCapabilityException {
|
throws IOException, StreamLacksCapabilityException {
|
||||||
fs.mkdirs(file.getParent());
|
fs.mkdirs(file.getParent());
|
||||||
ProtobufLogWriter writer = new ProtobufLogWriter();
|
ProtobufLogWriter writer = new ProtobufLogWriter();
|
||||||
writer.init(fs, file, conf, true, WALUtil.getWALBlockSize(conf, fs, file));
|
writer.init(fs, file, conf, true, WALUtil.getWALBlockSize(conf, fs, file),
|
||||||
|
StreamSlowMonitor.create(conf, "testMonitor"));
|
||||||
for (FSWALEntry entry : entries) {
|
for (FSWALEntry entry : entries) {
|
||||||
writer.append(entry);
|
writer.append(entry);
|
||||||
}
|
}
|
||||||
|
|
|
@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
import org.apache.hadoop.hbase.client.Table;
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
|
import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
|
||||||
import org.apache.hadoop.hbase.master.HMaster;
|
import org.apache.hadoop.hbase.master.HMaster;
|
||||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||||
import org.apache.hadoop.hbase.master.replication.RecoverStandbyProcedure;
|
import org.apache.hadoop.hbase.master.replication.RecoverStandbyProcedure;
|
||||||
|
@ -161,7 +162,9 @@ public class TestRecoverStandbyProcedure {
|
||||||
for (int i = 0; i < WAL_NUMBER; i++) {
|
for (int i = 0; i < WAL_NUMBER; i++) {
|
||||||
try (ProtobufLogWriter writer = new ProtobufLogWriter()) {
|
try (ProtobufLogWriter writer = new ProtobufLogWriter()) {
|
||||||
Path wal = new Path(peerRemoteWALDir, "srv1,8888." + i + ".syncrep");
|
Path wal = new Path(peerRemoteWALDir, "srv1,8888." + i + ".syncrep");
|
||||||
writer.init(fs, wal, conf, true, WALUtil.getWALBlockSize(conf, fs, peerRemoteWALDir));
|
writer.init(fs, wal, conf, true,
|
||||||
|
WALUtil.getWALBlockSize(conf, fs, peerRemoteWALDir),
|
||||||
|
StreamSlowMonitor.create(conf, "defaultMonitor"));
|
||||||
List<Entry> entries = setupWALEntries(i * ROW_COUNT, (i + 1) * ROW_COUNT);
|
List<Entry> entries = setupWALEntries(i * ROW_COUNT, (i + 1) * ROW_COUNT);
|
||||||
for (Entry entry : entries) {
|
for (Entry entry : entries) {
|
||||||
writer.append(entry);
|
writer.append(entry);
|
||||||
|
|
|
@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.Abortable;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
// imports for things that haven't moved from regionserver.wal yet.
|
// imports for things that haven't moved from regionserver.wal yet.
|
||||||
|
import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
|
import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter;
|
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
|
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
|
||||||
|
@ -212,7 +213,8 @@ public class IOTestProvider implements WALProvider {
|
||||||
LOG.info("creating new writer instance.");
|
LOG.info("creating new writer instance.");
|
||||||
final ProtobufLogWriter writer = new IOTestWriter();
|
final ProtobufLogWriter writer = new IOTestWriter();
|
||||||
try {
|
try {
|
||||||
writer.init(fs, path, conf, false, this.blocksize);
|
writer.init(fs, path, conf, false, this.blocksize,
|
||||||
|
StreamSlowMonitor.create(conf, path.getName()));
|
||||||
} catch (CommonFSUtils.StreamLacksCapabilityException exception) {
|
} catch (CommonFSUtils.StreamLacksCapabilityException exception) {
|
||||||
throw new IOException("Can't create writer instance because underlying FileSystem " +
|
throw new IOException("Can't create writer instance because underlying FileSystem " +
|
||||||
"doesn't support needed stream capabilities.", exception);
|
"doesn't support needed stream capabilities.", exception);
|
||||||
|
@ -240,7 +242,8 @@ public class IOTestProvider implements WALProvider {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable,
|
public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable,
|
||||||
long blocksize) throws IOException, CommonFSUtils.StreamLacksCapabilityException {
|
long blocksize, StreamSlowMonitor monitor) throws IOException,
|
||||||
|
CommonFSUtils.StreamLacksCapabilityException {
|
||||||
Collection<String> operations = conf.getStringCollection(ALLOWED_OPERATIONS);
|
Collection<String> operations = conf.getStringCollection(ALLOWED_OPERATIONS);
|
||||||
if (operations.isEmpty() || operations.contains(AllowedOperations.all.name())) {
|
if (operations.isEmpty() || operations.contains(AllowedOperations.all.name())) {
|
||||||
doAppends = doSyncs = true;
|
doAppends = doSyncs = true;
|
||||||
|
@ -252,7 +255,7 @@ public class IOTestProvider implements WALProvider {
|
||||||
}
|
}
|
||||||
LOG.info("IOTestWriter initialized with appends " + (doAppends ? "enabled" : "disabled") +
|
LOG.info("IOTestWriter initialized with appends " + (doAppends ? "enabled" : "disabled") +
|
||||||
" and syncs " + (doSyncs ? "enabled" : "disabled"));
|
" and syncs " + (doSyncs ? "enabled" : "disabled"));
|
||||||
super.init(fs, path, conf, overwritable, blocksize);
|
super.init(fs, path, conf, overwritable, blocksize, monitor);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
Loading…
Reference in New Issue