diff --git a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java index 452da1ca4f6..5b713196d0b 100644 --- a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java +++ b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java @@ -23,6 +23,7 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.StreamCapabilities; +import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; import org.apache.hadoop.hdfs.DistributedFileSystem; @@ -47,11 +48,11 @@ public final class AsyncFSOutputHelper { */ public static AsyncFSOutput createOutput(FileSystem fs, Path f, boolean overwrite, boolean createParent, short replication, long blockSize, EventLoopGroup eventLoopGroup, - Class channelClass) + Class channelClass, StreamSlowMonitor monitor) throws IOException, CommonFSUtils.StreamLacksCapabilityException { if (fs instanceof DistributedFileSystem) { return FanOutOneBlockAsyncDFSOutputHelper.createOutput((DistributedFileSystem) fs, f, - overwrite, createParent, replication, blockSize, eventLoopGroup, channelClass); + overwrite, createParent, replication, blockSize, eventLoopGroup, channelClass, monitor); } final FSDataOutputStream out; int bufferSize = fs.getConf().getInt(CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY, diff --git a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java index 74e7f68b0ad..41e345f46b8 100644 --- a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java +++ b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java @@ -32,7 +32,7 @@ import java.nio.ByteBuffer; import java.util.Collection; import java.util.Collections; import java.util.Iterator; -import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -44,7 +44,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.crypto.Encryptor; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.CancelOnClose; +import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; import org.apache.hadoop.hbase.util.CancelableProgressable; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DistributedFileSystem; @@ -67,6 +69,7 @@ import org.apache.hbase.thirdparty.io.netty.channel.Channel; import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler.Sharable; import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; import org.apache.hbase.thirdparty.io.netty.channel.ChannelId; +import org.apache.hbase.thirdparty.io.netty.channel.ChannelOutboundInvoker; import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler; import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent; @@ -120,7 +123,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { private final Encryptor encryptor; - private final List datanodeList; + private final Map datanodeInfoMap; private final DataChecksum summer; @@ -136,17 +139,22 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { // should be backed by a thread safe collection private final Set unfinishedReplicas; + private final long packetDataLen; + private final long flushTimestamp; + private long lastAckTimestamp = -1; public Callback(CompletableFuture future, long ackedLength, - Collection replicas) { + final Collection replicas, long packetDataLen) { this.future = future; this.ackedLength = ackedLength; + this.packetDataLen = packetDataLen; + this.flushTimestamp = EnvironmentEdgeManager.currentTime(); if (replicas.isEmpty()) { this.unfinishedReplicas = Collections.emptySet(); } else { this.unfinishedReplicas = Collections.newSetFromMap(new ConcurrentHashMap(replicas.size())); - replicas.stream().map(c -> c.id()).forEachOrdered(unfinishedReplicas::add); + replicas.stream().map(Channel::id).forEachOrdered(unfinishedReplicas::add); } } } @@ -176,6 +184,8 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { private volatile State state; + private final StreamSlowMonitor streamSlowMonitor; + // all lock-free to make it run faster private void completed(Channel channel) { for (Iterator iter = waitingAckQueue.iterator(); iter.hasNext();) { @@ -183,6 +193,10 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { // if the current unfinished replicas does not contain us then it means that we have already // acked this one, let's iterate to find the one we have not acked yet. if (c.unfinishedReplicas.remove(channel.id())) { + long current = EnvironmentEdgeManager.currentTime(); + streamSlowMonitor.checkProcessTimeAndSpeed(datanodeInfoMap.get(channel), c.packetDataLen, + current - c.flushTimestamp, c.lastAckTimestamp, c.unfinishedReplicas.size()); + c.lastAckTimestamp = current; if (c.unfinishedReplicas.isEmpty()) { // we need to remove first before complete the future. It is possible that after we // complete the future the upper layer will call close immediately before we remove the @@ -245,7 +259,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { } break; } - datanodeList.forEach(ch -> ch.close()); + datanodeInfoMap.keySet().forEach(ChannelOutboundInvoker::close); } @Sharable @@ -313,7 +327,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { private void setupReceiver(int timeoutMs) { AckHandler ackHandler = new AckHandler(timeoutMs); - for (Channel ch : datanodeList) { + for (Channel ch : datanodeInfoMap.keySet()) { ch.pipeline().addLast( new IdleStateHandler(timeoutMs, timeoutMs / 2, 0, TimeUnit.MILLISECONDS), new ProtobufVarint32FrameDecoder(), @@ -324,8 +338,8 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { FanOutOneBlockAsyncDFSOutput(Configuration conf,DistributedFileSystem dfs, DFSClient client, ClientProtocol namenode, String clientName, String src, long fileId, - LocatedBlock locatedBlock, Encryptor encryptor, List datanodeList, - DataChecksum summer, ByteBufAllocator alloc) { + LocatedBlock locatedBlock, Encryptor encryptor, Map datanodeInfoMap, + DataChecksum summer, ByteBufAllocator alloc, StreamSlowMonitor streamSlowMonitor) { this.conf = conf; this.dfs = dfs; this.client = client; @@ -336,13 +350,14 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { this.block = locatedBlock.getBlock(); this.locations = locatedBlock.getLocations(); this.encryptor = encryptor; - this.datanodeList = datanodeList; + this.datanodeInfoMap = datanodeInfoMap; this.summer = summer; this.maxDataLen = MAX_DATA_LEN - (MAX_DATA_LEN % summer.getBytesPerChecksum()); this.alloc = alloc; this.buf = alloc.directBuffer(sendBufSizePRedictor.initialSize()); this.state = State.STREAMING; setupReceiver(conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT)); + this.streamSlowMonitor = streamSlowMonitor; } @Override @@ -394,7 +409,8 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { ByteBuf headerBuf = alloc.buffer(headerLen); header.putInBuffer(headerBuf.nioBuffer(0, headerLen)); headerBuf.writerIndex(headerLen); - Callback c = new Callback(future, nextPacketOffsetInBlock + dataLen, datanodeList); + Callback c = new Callback(future, nextPacketOffsetInBlock + dataLen, + datanodeInfoMap.keySet(), dataLen); waitingAckQueue.addLast(c); // recheck again after we pushed the callback to queue if (state != State.STREAMING && waitingAckQueue.peekFirst() == c) { @@ -405,7 +421,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { } // TODO: we should perhaps measure time taken per DN here; // we could collect statistics per DN, and/or exclude bad nodes in createOutput. - datanodeList.forEach(ch -> { + datanodeInfoMap.keySet().forEach(ch -> { ch.write(headerBuf.retainedDuplicate()); ch.write(checksumBuf.retainedDuplicate()); ch.writeAndFlush(dataBuf.retainedDuplicate()); @@ -427,7 +443,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { long lengthAfterFlush = nextPacketOffsetInBlock + dataLen; Callback lastFlush = waitingAckQueue.peekLast(); if (lastFlush != null) { - Callback c = new Callback(future, lengthAfterFlush, Collections.emptyList()); + Callback c = new Callback(future, lengthAfterFlush, Collections.emptySet(), dataLen); waitingAckQueue.addLast(c); // recheck here if we have already removed the previous callback from the queue if (waitingAckQueue.peekFirst() == c) { @@ -527,8 +543,8 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { header.putInBuffer(headerBuf.nioBuffer(0, headerLen)); headerBuf.writerIndex(headerLen); CompletableFuture future = new CompletableFuture<>(); - waitingAckQueue.add(new Callback(future, finalizedLength, datanodeList)); - datanodeList.forEach(ch -> ch.writeAndFlush(headerBuf.retainedDuplicate())); + waitingAckQueue.add(new Callback(future, finalizedLength, datanodeInfoMap.keySet(), 0)); + datanodeInfoMap.keySet().forEach(ch -> ch.writeAndFlush(headerBuf.retainedDuplicate())); headerBuf.release(); try { future.get(); @@ -545,13 +561,14 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { * The close method when error occurred. Now we just call recoverFileLease. */ @Override + @SuppressWarnings("FutureReturnValueIgnored") public void recoverAndClose(CancelableProgressable reporter) throws IOException { if (buf != null) { buf.release(); buf = null; } - datanodeList.forEach(ch -> ch.close()); - datanodeList.forEach(ch -> ch.closeFuture().awaitUninterruptibly()); + datanodeInfoMap.keySet().forEach(ChannelOutboundInvoker::close); + datanodeInfoMap.keySet().forEach(ch -> ch.closeFuture().awaitUninterruptibly()); endFileLease(client, fileId); RecoverLeaseFSUtils.recoverFileLease(dfs, new Path(src), conf, reporter == null ? new CancelOnClose(client) : reporter); @@ -562,11 +579,12 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { * {@link #recoverAndClose(CancelableProgressable)} if this method throws an exception. */ @Override + @SuppressWarnings("FutureReturnValueIgnored") public void close() throws IOException { endBlock(); state = State.CLOSED; - datanodeList.forEach(ch -> ch.close()); - datanodeList.forEach(ch -> ch.closeFuture().awaitUninterruptibly()); + datanodeInfoMap.keySet().forEach(ChannelOutboundInvoker::close); + datanodeInfoMap.keySet().forEach(ch -> ch.closeFuture().awaitUninterruptibly()); block.setNumBytes(ackedBlockLength); completeFile(client, namenode, src, clientName, block, fileId); } diff --git a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java index f4a0005d7f8..7c62d67c6ce 100644 --- a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java +++ b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java @@ -33,9 +33,12 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.ArrayList; import java.util.EnumSet; +import java.util.HashSet; +import java.util.IdentityHashMap; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; -import org.apache.commons.lang3.ArrayUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.crypto.CryptoProtocolVersion; import org.apache.hadoop.crypto.Encryptor; @@ -47,6 +50,8 @@ import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hbase.client.ConnectionUtils; +import org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager; +import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSOutputStream; @@ -128,8 +133,6 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { // Timeouts for communicating with DataNode for streaming writes/reads public static final int READ_TIMEOUT = 60 * 1000; - private static final DatanodeInfo[] EMPTY_DN_ARRAY = new DatanodeInfo[0]; - private interface LeaseManager { void begin(DFSClient client, long inodeId); @@ -451,15 +454,20 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, String src, boolean overwrite, boolean createParent, short replication, long blockSize, - EventLoopGroup eventLoopGroup, Class channelClass) throws IOException { + EventLoopGroup eventLoopGroup, Class channelClass, + StreamSlowMonitor monitor) throws IOException { Configuration conf = dfs.getConf(); DFSClient client = dfs.getClient(); String clientName = client.getClientName(); ClientProtocol namenode = client.getNamenode(); int createMaxRetries = conf.getInt(ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES, DEFAULT_ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES); - DatanodeInfo[] excludesNodes = EMPTY_DN_ARRAY; + ExcludeDatanodeManager excludeDatanodeManager = monitor.getExcludeDatanodeManager(); + Set toExcludeNodes = + new HashSet<>(excludeDatanodeManager.getExcludeDNs().keySet()); for (int retry = 0;; retry++) { + LOG.debug("When create output stream for {}, exclude list is {}, retry={}", src, + toExcludeNodes, retry); HdfsFileStatus stat; try { stat = FILE_CREATOR.create(namenode, src, @@ -479,24 +487,26 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { List> futureList = null; try { DataChecksum summer = createChecksum(client); - locatedBlock = namenode.addBlock(src, client.getClientName(), null, excludesNodes, - stat.getFileId(), null, null); - List datanodeList = new ArrayList<>(); + locatedBlock = namenode.addBlock(src, client.getClientName(), null, + toExcludeNodes.toArray(new DatanodeInfo[0]), stat.getFileId(), null, null); + Map datanodes = new IdentityHashMap<>(); futureList = connectToDataNodes(conf, client, clientName, locatedBlock, 0L, 0L, PIPELINE_SETUP_CREATE, summer, eventLoopGroup, channelClass); for (int i = 0, n = futureList.size(); i < n; i++) { + DatanodeInfo datanodeInfo = locatedBlock.getLocations()[i]; try { - datanodeList.add(futureList.get(i).syncUninterruptibly().getNow()); + datanodes.put(futureList.get(i).syncUninterruptibly().getNow(), datanodeInfo); } catch (Exception e) { // exclude the broken DN next time - excludesNodes = ArrayUtils.add(excludesNodes, locatedBlock.getLocations()[i]); + toExcludeNodes.add(datanodeInfo); + excludeDatanodeManager.tryAddExcludeDN(datanodeInfo, "connect error"); throw e; } } Encryptor encryptor = createEncryptor(conf, stat, client); FanOutOneBlockAsyncDFSOutput output = new FanOutOneBlockAsyncDFSOutput(conf, dfs, client, namenode, clientName, src, - stat.getFileId(), locatedBlock, encryptor, datanodeList, summer, ALLOC); + stat.getFileId(), locatedBlock, encryptor, datanodes, summer, ALLOC, monitor); succ = true; return output; } catch (RemoteException e) { @@ -547,14 +557,15 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { */ public static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, Path f, boolean overwrite, boolean createParent, short replication, long blockSize, - EventLoopGroup eventLoopGroup, Class channelClass) throws IOException { + EventLoopGroup eventLoopGroup, Class channelClass, + final StreamSlowMonitor monitor) throws IOException { return new FileSystemLinkResolver() { @Override public FanOutOneBlockAsyncDFSOutput doCall(Path p) throws IOException, UnresolvedLinkException { return createOutput(dfs, p.toUri().getPath(), overwrite, createParent, replication, - blockSize, eventLoopGroup, channelClass); + blockSize, eventLoopGroup, channelClass, monitor); } @Override diff --git a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/monitor/ExcludeDatanodeManager.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/monitor/ExcludeDatanodeManager.java new file mode 100644 index 00000000000..80748cad609 --- /dev/null +++ b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/monitor/ExcludeDatanodeManager.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.asyncfs.monitor; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.conf.ConfigurationObserver; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.cache.Cache; +import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; + +/** + * The class to manage the excluded datanodes of the WALs on the regionserver. + */ +@InterfaceAudience.Private +public class ExcludeDatanodeManager implements ConfigurationObserver { + private static final Logger LOG = LoggerFactory.getLogger(ExcludeDatanodeManager.class); + + /** + * Configure for the max count the excluded datanodes. + */ + public static final String WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT_KEY = + "hbase.regionserver.async.wal.max.exclude.datanode.count"; + public static final int DEFAULT_WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT = 3; + + /** + * Configure for the TTL time of the datanodes excluded + */ + public static final String WAL_EXCLUDE_DATANODE_TTL_KEY = + "hbase.regionserver.async.wal.exclude.datanode.info.ttl.hour"; + public static final int DEFAULT_WAL_EXCLUDE_DATANODE_TTL = 6; // 6 hours + + private volatile Cache excludeDNsCache; + private final int maxExcludeDNCount; + private final Configuration conf; + // This is a map of providerId->StreamSlowMonitor + private final Map streamSlowMonitors = + new ConcurrentHashMap<>(1); + + public ExcludeDatanodeManager(Configuration conf) { + this.conf = conf; + this.maxExcludeDNCount = conf.getInt(WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT_KEY, + DEFAULT_WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT); + this.excludeDNsCache = CacheBuilder.newBuilder() + .expireAfterWrite(this.conf.getLong(WAL_EXCLUDE_DATANODE_TTL_KEY, + DEFAULT_WAL_EXCLUDE_DATANODE_TTL), TimeUnit.HOURS) + .maximumSize(this.maxExcludeDNCount) + .build(); + } + + /** + * Try to add a datanode to the regionserver excluding cache + * @param datanodeInfo the datanode to be added to the excluded cache + * @param cause the cause that the datanode is hope to be excluded + * @return True if the datanode is added to the regionserver excluding cache, false otherwise + */ + public boolean tryAddExcludeDN(DatanodeInfo datanodeInfo, String cause) { + boolean alreadyMarkedSlow = getExcludeDNs().containsKey(datanodeInfo); + if (!alreadyMarkedSlow) { + excludeDNsCache.put(datanodeInfo, EnvironmentEdgeManager.currentTime()); + LOG.info( + "Added datanode: {} to exclude cache by [{}] success, current excludeDNsCache size={}", + datanodeInfo, cause, excludeDNsCache.size()); + return true; + } + LOG.debug("Try add datanode {} to exclude cache by [{}] failed, " + + "current exclude DNs are {}", datanodeInfo, cause, getExcludeDNs().keySet()); + return false; + } + + public StreamSlowMonitor getStreamSlowMonitor(String name) { + String key = name == null || name.isEmpty() ? "defaultMonitorName" : name; + return streamSlowMonitors + .computeIfAbsent(key, k -> new StreamSlowMonitor(conf, key, this)); + } + + public Map getExcludeDNs() { + return excludeDNsCache.asMap(); + } + + @Override + public void onConfigurationChange(Configuration conf) { + for (StreamSlowMonitor monitor : streamSlowMonitors.values()) { + monitor.onConfigurationChange(conf); + } + this.excludeDNsCache = CacheBuilder.newBuilder().expireAfterWrite( + this.conf.getLong(WAL_EXCLUDE_DATANODE_TTL_KEY, DEFAULT_WAL_EXCLUDE_DATANODE_TTL), + TimeUnit.HOURS).maximumSize(this.conf + .getInt(WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT_KEY, DEFAULT_WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT)) + .build(); + } +} diff --git a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/monitor/StreamSlowMonitor.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/monitor/StreamSlowMonitor.java new file mode 100644 index 00000000000..7ee04f8eebd --- /dev/null +++ b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/monitor/StreamSlowMonitor.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.asyncfs.monitor; + +import static org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager.DEFAULT_WAL_EXCLUDE_DATANODE_TTL; +import static org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager.DEFAULT_WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT; +import static org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager.WAL_EXCLUDE_DATANODE_TTL_KEY; +import static org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager.WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT_KEY; + +import java.util.Deque; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.conf.ConfigurationObserver; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; +import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader; +import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache; + +/** + * Class for monitor the wal file flush performance. + * Each active wal file has a StreamSlowMonitor. + */ +@InterfaceAudience.Private +public class StreamSlowMonitor implements ConfigurationObserver { + private static final Logger LOG = LoggerFactory.getLogger(StreamSlowMonitor.class); + + /** + * Configure for the min count for a datanode detected slow. + * If a datanode is detected slow times up to this count, then it will be added to the exclude + * datanode cache by {@link ExcludeDatanodeManager#tryAddExcludeDN(DatanodeInfo, String)} + * of this regionsever. + */ + private static final String WAL_SLOW_DETECT_MIN_COUNT_KEY = + "hbase.regionserver.async.wal.min.slow.detect.count"; + private static final int DEFAULT_WAL_SLOW_DETECT_MIN_COUNT = 3; + + /** + * Configure for the TTL of the data that a datanode detected slow. + */ + private static final String WAL_SLOW_DETECT_DATA_TTL_KEY = + "hbase.regionserver.async.wal.slow.detect.data.ttl.ms"; + private static final long DEFAULT_WAL_SLOW_DETECT_DATA_TTL = 10 * 60 * 1000; // 10min in ms + + /** + * Configure for the speed check of packet min length. + * For packets whose data length smaller than this value, check slow by processing time. + * While for packets whose data length larger than this value, check slow by flushing speed. + */ + private static final String DATANODE_PACKET_FLUSH_CHECK_SPEED_MIN_DATA_LENGTH_KEY = + "hbase.regionserver.async.wal.datanode.slow.check.speed.packet.data.length.min"; + private static final long DEFAULT_DATANODE_PACKET_FLUSH_CHECK_SPEED_MIN_DATA_LENGTH = + 64 * 1024; //64KB + + /** + * Configure for the slow packet process time, a duration from send to ACK. + * The processing time check is for packets that data length smaller than + * {@link StreamSlowMonitor#DATANODE_PACKET_FLUSH_CHECK_SPEED_MIN_DATA_LENGTH_KEY} + */ + public static final String DATANODE_SLOW_PACKET_PROCESS_TIME_KEY = + "hbase.regionserver.async.wal.datanode.slow.packet.process.time.millis"; + private static final long DEFAULT_DATANODE_SLOW_PACKET_PROCESS_TIME = 6000; // 6s in ms + + /** + * Configure for the check of large packet(which is configured by + * {@link StreamSlowMonitor#DATANODE_PACKET_FLUSH_CHECK_SPEED_MIN_DATA_LENGTH_KEY}) flush speed. + * e.g. If the configured slow packet process time is smaller than 10s, then here 20KB/s means + * 64KB should be processed in less than 3.2s. + */ + private static final String DATANODE_SLOW_PACKET_FLUSH_MIN_SPEED_KEY = + "hbase.regionserver.async.wal.datanode.slow.packet.speed.min.kbs"; + private static final double DEFAULT_DATANODE_SLOW_PACKET_FLUSH_MIN_SPEED = 20; // 20KB/s + + private final String name; + // this is a map of datanodeInfo->queued slow PacketAckData + private final LoadingCache> datanodeSlowDataQueue; + private final ExcludeDatanodeManager excludeDatanodeManager; + + private int minSlowDetectCount; + private long slowDataTtl; + private long slowPacketAckMs; + private double minPacketFlushSpeedKBs; + private long minLengthForSpeedCheck; + + public StreamSlowMonitor(Configuration conf, String name, + ExcludeDatanodeManager excludeDatanodeManager) { + setConf(conf); + this.name = name; + this.excludeDatanodeManager = excludeDatanodeManager; + this.datanodeSlowDataQueue = CacheBuilder.newBuilder() + .maximumSize(conf.getInt(WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT_KEY, + DEFAULT_WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT)) + .expireAfterWrite(conf.getLong(WAL_EXCLUDE_DATANODE_TTL_KEY, + DEFAULT_WAL_EXCLUDE_DATANODE_TTL), TimeUnit.HOURS) + .build(new CacheLoader>() { + @Override + public Deque load(DatanodeInfo key) throws Exception { + return new ConcurrentLinkedDeque<>(); + } + }); + LOG.info("New stream slow monitor {}", this.name); + } + + public static StreamSlowMonitor create(Configuration conf, String name) { + return new StreamSlowMonitor(conf, name, new ExcludeDatanodeManager(conf)); + } + + /** + * Check if the packet process time shows that the relevant datanode is a slow node. + * @param datanodeInfo the datanode that processed the packet + * @param packetDataLen the data length of the packet (in bytes) + * @param processTimeMs the process time (in ms) of the packet on the datanode, + * @param lastAckTimestamp the last acked timestamp of the packet on another datanode + * @param unfinished if the packet is unfinished flushed to the datanode replicas + */ + public void checkProcessTimeAndSpeed(DatanodeInfo datanodeInfo, long packetDataLen, + long processTimeMs, long lastAckTimestamp, int unfinished) { + long current = EnvironmentEdgeManager.currentTime(); + // Here are two conditions used to determine whether a datanode is slow, + // 1. For small packet, we just have a simple time limit, without considering + // the size of the packet. + // 2. For large packet, we will calculate the speed, and check if the speed is too slow. + boolean slow = (packetDataLen <= minLengthForSpeedCheck && processTimeMs > slowPacketAckMs) || ( + packetDataLen > minLengthForSpeedCheck + && (double) packetDataLen / processTimeMs < minPacketFlushSpeedKBs); + if (slow) { + // Check if large diff ack timestamp between replicas, + // should try to avoid misjudgments that caused by GC STW. + if ((lastAckTimestamp > 0 && current - lastAckTimestamp > slowPacketAckMs / 2) || ( + lastAckTimestamp <= 0 && unfinished == 0)) { + LOG.info("Slow datanode: {}, data length={}, duration={}ms, unfinishedReplicas={}, " + + "lastAckTimestamp={}, monitor name: {}", datanodeInfo, packetDataLen, processTimeMs, + unfinished, lastAckTimestamp, this.name); + if (addSlowAckData(datanodeInfo, packetDataLen, processTimeMs)) { + excludeDatanodeManager.tryAddExcludeDN(datanodeInfo, "slow packet ack"); + } + } + } + } + + @Override + public void onConfigurationChange(Configuration conf) { + setConf(conf); + } + + private boolean addSlowAckData(DatanodeInfo datanodeInfo, long dataLength, long processTime) { + Deque slowDNQueue = datanodeSlowDataQueue.getUnchecked(datanodeInfo); + long current = EnvironmentEdgeManager.currentTime(); + while (!slowDNQueue.isEmpty() && (current - slowDNQueue.getFirst().getTimestamp() > slowDataTtl + || slowDNQueue.size() >= minSlowDetectCount)) { + slowDNQueue.removeFirst(); + } + slowDNQueue.addLast(new PacketAckData(dataLength, processTime)); + return slowDNQueue.size() >= minSlowDetectCount; + } + + private void setConf(Configuration conf) { + this.minSlowDetectCount = conf.getInt(WAL_SLOW_DETECT_MIN_COUNT_KEY, + DEFAULT_WAL_SLOW_DETECT_MIN_COUNT); + this.slowDataTtl = conf.getLong(WAL_SLOW_DETECT_DATA_TTL_KEY, DEFAULT_WAL_SLOW_DETECT_DATA_TTL); + this.slowPacketAckMs = conf.getLong(DATANODE_SLOW_PACKET_PROCESS_TIME_KEY, + DEFAULT_DATANODE_SLOW_PACKET_PROCESS_TIME); + this.minLengthForSpeedCheck = conf.getLong( + DATANODE_PACKET_FLUSH_CHECK_SPEED_MIN_DATA_LENGTH_KEY, + DEFAULT_DATANODE_PACKET_FLUSH_CHECK_SPEED_MIN_DATA_LENGTH); + this.minPacketFlushSpeedKBs = conf.getDouble(DATANODE_SLOW_PACKET_FLUSH_MIN_SPEED_KEY, + DEFAULT_DATANODE_SLOW_PACKET_FLUSH_MIN_SPEED); + } + + public ExcludeDatanodeManager getExcludeDatanodeManager() { + return excludeDatanodeManager; + } + + private static class PacketAckData { + private final long dataLength; + private final long processTime; + private final long timestamp; + + public PacketAckData(long dataLength, long processTime) { + this.dataLength = dataLength; + this.processTime = processTime; + this.timestamp = EnvironmentEdgeManager.currentTime(); + } + + public long getDataLength() { + return dataLength; + } + + public long getProcessTime() { + return processTime; + } + + public long getTimestamp() { + return timestamp; + } + } +} diff --git a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestExcludeDatanodeManager.java b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestExcludeDatanodeManager.java new file mode 100644 index 00000000000..a3da52ef335 --- /dev/null +++ b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestExcludeDatanodeManager.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.asyncfs; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager; +import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ SmallTests.class }) +public class TestExcludeDatanodeManager { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestExcludeDatanodeManager.class); + + @Test + public void testExcludeSlowDNBySpeed() { + Configuration conf = HBaseConfiguration.create(); + ExcludeDatanodeManager excludeDatanodeManager = new ExcludeDatanodeManager(conf); + StreamSlowMonitor streamSlowDNsMonitor = + excludeDatanodeManager.getStreamSlowMonitor("testMonitor"); + assertEquals(0, excludeDatanodeManager.getExcludeDNs().size()); + DatanodeInfo datanodeInfo = + new DatanodeInfo.DatanodeInfoBuilder().setIpAddr("0.0.0.0").setHostName("hostname1") + .setDatanodeUuid("uuid1").setXferPort(111).setInfoPort(222).setInfoSecurePort(333) + .setIpcPort(444).setNetworkLocation("location1").build(); + streamSlowDNsMonitor + .checkProcessTimeAndSpeed(datanodeInfo, 100000, 5100, + System.currentTimeMillis() - 5100, 0); + streamSlowDNsMonitor + .checkProcessTimeAndSpeed(datanodeInfo, 100000, 5100, + System.currentTimeMillis() - 5100, 0); + streamSlowDNsMonitor + .checkProcessTimeAndSpeed(datanodeInfo, 100000, 5100, + System.currentTimeMillis() - 5100, 0); + assertEquals(1, excludeDatanodeManager.getExcludeDNs().size()); + assertTrue(excludeDatanodeManager.getExcludeDNs().containsKey(datanodeInfo)); + } + + @Test + public void testExcludeSlowDNByProcessTime() { + Configuration conf = HBaseConfiguration.create(); + ExcludeDatanodeManager excludeDatanodeManager = new ExcludeDatanodeManager(conf); + StreamSlowMonitor streamSlowDNsMonitor = + excludeDatanodeManager.getStreamSlowMonitor("testMonitor"); + assertEquals(0, excludeDatanodeManager.getExcludeDNs().size()); + DatanodeInfo datanodeInfo = + new DatanodeInfo.DatanodeInfoBuilder().setIpAddr("0.0.0.0").setHostName("hostname1") + .setDatanodeUuid("uuid1").setXferPort(111).setInfoPort(222).setInfoSecurePort(333) + .setIpcPort(444).setNetworkLocation("location1").build(); + streamSlowDNsMonitor + .checkProcessTimeAndSpeed(datanodeInfo, 5000, 7000, + System.currentTimeMillis() - 7000, 0); + streamSlowDNsMonitor + .checkProcessTimeAndSpeed(datanodeInfo, 5000, 7000, + System.currentTimeMillis() - 7000, 0); + streamSlowDNsMonitor + .checkProcessTimeAndSpeed(datanodeInfo, 5000, 7000, + System.currentTimeMillis() - 7000, 0); + assertEquals(1, excludeDatanodeManager.getExcludeDNs().size()); + assertTrue(excludeDatanodeManager.getExcludeDNs().containsKey(datanodeInfo)); + } +} diff --git a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java index 03ff1ee7753..8533d38bae0 100644 --- a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java +++ b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java @@ -39,6 +39,9 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager; +import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hdfs.DistributedFileSystem; @@ -78,6 +81,8 @@ public class TestFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase { private static int READ_TIMEOUT_MS = 2000; + private static StreamSlowMonitor MONITOR; + @Rule public TestName name = new TestName(); @@ -88,6 +93,7 @@ public class TestFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase { FS = CLUSTER.getFileSystem(); EVENT_LOOP_GROUP = new NioEventLoopGroup(); CHANNEL_CLASS = NioSocketChannel.class; + MONITOR = StreamSlowMonitor.create(UTIL.getConfiguration(), "testMonitor"); } @AfterClass @@ -133,7 +139,7 @@ public class TestFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase { Path f = new Path("/" + name.getMethodName()); EventLoop eventLoop = EVENT_LOOP_GROUP.next(); FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, - false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS); + false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR); writeAndVerify(FS, f, out); } @@ -142,7 +148,7 @@ public class TestFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase { Path f = new Path("/" + name.getMethodName()); EventLoop eventLoop = EVENT_LOOP_GROUP.next(); FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, - false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS); + false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR); byte[] b = new byte[10]; ThreadLocalRandom.current().nextBytes(b); out.write(b, 0, b.length); @@ -171,7 +177,7 @@ public class TestFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase { Path f = new Path("/" + name.getMethodName()); EventLoop eventLoop = EVENT_LOOP_GROUP.next(); FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, - false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS); + false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR); Thread.sleep(READ_TIMEOUT_MS * 2); // the connection to datanode should still alive. writeAndVerify(FS, f, out); @@ -186,7 +192,7 @@ public class TestFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase { EventLoop eventLoop = EVENT_LOOP_GROUP.next(); try { FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3, - FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS); + FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR); fail("should fail with parent does not exist"); } catch (RemoteException e) { LOG.info("expected exception caught", e); @@ -209,7 +215,7 @@ public class TestFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase { Path f = new Path("/test"); EventLoop eventLoop = EVENT_LOOP_GROUP.next(); try (FanOutOneBlockAsyncDFSOutput output = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, - f, true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS)) { + f, true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR)) { // should exclude the dead dn when retry so here we only have 2 DNs in pipeline assertEquals(2, output.getPipeline().length); } finally { @@ -217,12 +223,42 @@ public class TestFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase { } } + @Test + public void testExcludeFailedConnectToDatanode() + throws IOException, ClassNotFoundException, NoSuchMethodException, IllegalAccessException, + InvocationTargetException, InterruptedException, NoSuchFieldException { + Field xceiverServerDaemonField = DataNode.class.getDeclaredField("dataXceiverServer"); + xceiverServerDaemonField.setAccessible(true); + Class xceiverServerClass = + Class.forName("org.apache.hadoop.hdfs.server.datanode.DataXceiverServer"); + Method numPeersMethod = xceiverServerClass.getDeclaredMethod("getNumPeers"); + numPeersMethod.setAccessible(true); + // make one datanode broken + DataNodeProperties dnProp = CLUSTER.stopDataNode(0); + Path f = new Path("/test"); + EventLoop eventLoop = EVENT_LOOP_GROUP.next(); + ExcludeDatanodeManager excludeDatanodeManager = + new ExcludeDatanodeManager(HBaseConfiguration.create()); + StreamSlowMonitor streamSlowDNsMonitor = + excludeDatanodeManager.getStreamSlowMonitor("testMonitor"); + assertEquals(0, excludeDatanodeManager.getExcludeDNs().size()); + try (FanOutOneBlockAsyncDFSOutput output = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, + f, true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, + CHANNEL_CLASS, streamSlowDNsMonitor)) { + // should exclude the dead dn when retry so here we only have 2 DNs in pipeline + assertEquals(2, output.getPipeline().length); + assertEquals(1, excludeDatanodeManager.getExcludeDNs().size()); + } finally { + CLUSTER.restartDataNode(dnProp); + } + } + @Test public void testWriteLargeChunk() throws IOException, InterruptedException, ExecutionException { Path f = new Path("/" + name.getMethodName()); EventLoop eventLoop = EVENT_LOOP_GROUP.next(); FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, - false, (short) 3, 1024 * 1024 * 1024, eventLoop, CHANNEL_CLASS); + false, (short) 3, 1024 * 1024 * 1024, eventLoop, CHANNEL_CLASS, MONITOR); byte[] b = new byte[50 * 1024 * 1024]; ThreadLocalRandom.current().nextBytes(b); out.write(b); diff --git a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java index 48c6a6799ca..eff8d8a86b7 100644 --- a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java +++ b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java @@ -23,6 +23,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseCommonTestingUtil; +import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.CommonFSUtils; @@ -49,10 +50,13 @@ public class TestLocalAsyncOutput { private static final HBaseCommonTestingUtil TEST_UTIL = new HBaseCommonTestingUtil(); + private static StreamSlowMonitor MONITOR; + @AfterClass public static void tearDownAfterClass() throws IOException { TEST_UTIL.cleanupTestDir(); GROUP.shutdownGracefully(); + MONITOR = StreamSlowMonitor.create(TEST_UTIL.getConfiguration(), "testMonitor"); } @Test @@ -61,7 +65,7 @@ public class TestLocalAsyncOutput { Path f = new Path(TEST_UTIL.getDataTestDir(), "test"); FileSystem fs = FileSystem.getLocal(TEST_UTIL.getConfiguration()); AsyncFSOutput out = AsyncFSOutputHelper.createOutput(fs, f, false, true, - fs.getDefaultReplication(f), fs.getDefaultBlockSize(f), GROUP, CHANNEL_CLASS); + fs.getDefaultReplication(f), fs.getDefaultBlockSize(f), GROUP, CHANNEL_CLASS, MONITOR); TestFanOutOneBlockAsyncDFSOutput.writeAndVerify(fs, f, out); } } diff --git a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java index 3a02c5e28af..e7fce27d60c 100644 --- a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java +++ b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java @@ -24,7 +24,6 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY import java.io.File; import java.io.IOException; -import java.lang.reflect.Method; import java.net.BindException; import java.net.URI; import java.util.ArrayList; @@ -40,6 +39,7 @@ import org.apache.hadoop.crypto.key.KeyProvider; import org.apache.hadoop.crypto.key.KeyProviderFactory; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; import org.apache.hadoop.hbase.security.HBaseKerberosUtils; import org.apache.hadoop.hbase.security.SecurityConstants; import org.apache.hadoop.hbase.testclassification.LargeTests; @@ -102,6 +102,8 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase { private static String TEST_KEY_NAME = "test_key"; + private static StreamSlowMonitor MONITOR; + @Rule public TestName name = new TestName(); @@ -187,6 +189,7 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase { HBaseKerberosUtils.setSecuredConfiguration(UTIL.getConfiguration(), PRINCIPAL + "@" + KDC.getRealm(), HTTP_PRINCIPAL + "@" + KDC.getRealm()); HBaseKerberosUtils.setSSLConfiguration(UTIL, TestSaslFanOutOneBlockAsyncDFSOutput.class); + MONITOR = StreamSlowMonitor.create(UTIL.getConfiguration(), "testMonitor"); } @AfterClass @@ -252,7 +255,7 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase { private void test(Path file) throws IOException, InterruptedException, ExecutionException { EventLoop eventLoop = EVENT_LOOP_GROUP.next(); FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, file, - true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS); + true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR); TestFanOutOneBlockAsyncDFSOutput.writeAndVerify(FS, file, out); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java index 3b84488d1dc..743369f2175 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java @@ -32,6 +32,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.codec.Codec; +import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.crypto.Cipher; import org.apache.hadoop.hbase.io.crypto.Encryption; @@ -47,7 +48,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; - import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer; @@ -169,7 +169,8 @@ public abstract class AbstractProtobufLogWriter { } public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable, - long blocksize) throws IOException, StreamLacksCapabilityException { + long blocksize, StreamSlowMonitor monitor) throws IOException, + StreamLacksCapabilityException { this.conf = conf; boolean doCompress = initializeCompressionContext(conf, path); this.trailerWarnSize = conf.getInt(WAL_TRAILER_WARN_SIZE, DEFAULT_WAL_TRAILER_WARN_SIZE); @@ -177,7 +178,7 @@ public abstract class AbstractProtobufLogWriter { short replication = (short) conf.getInt("hbase.regionserver.hlog.replication", CommonFSUtils.getDefaultReplication(fs, path)); - initOutput(fs, path, overwritable, bufferSize, replication, blocksize); + initOutput(fs, path, overwritable, bufferSize, replication, blocksize, monitor); boolean doTagCompress = doCompress && conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true); @@ -266,7 +267,8 @@ public abstract class AbstractProtobufLogWriter { } protected abstract void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize, - short replication, long blockSize) throws IOException, StreamLacksCapabilityException; + short replication, long blockSize, StreamSlowMonitor monitor) + throws IOException, StreamLacksCapabilityException; /** * return the file length after written. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java index 49c9e44885d..dca64b9f77e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java @@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput; +import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; import org.apache.hadoop.hbase.wal.AsyncFSWALProvider; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALKeyImpl; @@ -200,22 +201,26 @@ public class AsyncFSWAL extends AbstractFSWAL { private final int waitOnShutdownInSeconds; + private final StreamSlowMonitor streamSlowMonitor; + public AsyncFSWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir, Configuration conf, List listeners, boolean failIfWALExists, String prefix, String suffix, EventLoopGroup eventLoopGroup, Class channelClass) throws FailedLogCloseException, IOException { this(fs, null, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix, - eventLoopGroup, channelClass); + eventLoopGroup, channelClass, StreamSlowMonitor.create(conf, "monitorForSuffix")); } public AsyncFSWAL(FileSystem fs, Abortable abortable, Path rootDir, String logDir, String archiveDir, Configuration conf, List listeners, boolean failIfWALExists, String prefix, String suffix, EventLoopGroup eventLoopGroup, - Class channelClass) throws FailedLogCloseException, IOException { + Class channelClass, StreamSlowMonitor monitor) + throws FailedLogCloseException, IOException { super(fs, abortable, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix); this.eventLoopGroup = eventLoopGroup; this.channelClass = channelClass; + this.streamSlowMonitor = monitor; Supplier hasConsumerTask; if (conf.getBoolean(ASYNC_WAL_USE_SHARED_EVENT_LOOP, DEFAULT_ASYNC_WAL_USE_SHARED_EVENT_LOOP)) { this.consumeExecutor = eventLoopGroup.next(); @@ -712,7 +717,7 @@ public class AsyncFSWAL extends AbstractFSWAL { protected final AsyncWriter createAsyncWriter(FileSystem fs, Path path) throws IOException { return AsyncFSWALProvider.createAsyncWriter(conf, fs, path, false, this.blocksize, - eventLoopGroup, channelClass); + eventLoopGroup, channelClass, streamSlowMonitor); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java index e834d654310..0ab04076e78 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.io.ByteBufferWriter; import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput; import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutputHelper; +import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; import org.apache.hadoop.hbase.wal.AsyncFSWALProvider; import org.apache.hadoop.hbase.wal.WAL.Entry; @@ -176,9 +177,10 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter @Override protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize, - short replication, long blockSize) throws IOException, StreamLacksCapabilityException { + short replication, long blockSize, StreamSlowMonitor monitor) throws IOException, + StreamLacksCapabilityException { this.output = AsyncFSOutputHelper.createOutput(fs, path, overwritable, false, replication, - blockSize, eventLoopGroup, channelClass); + blockSize, eventLoopGroup, channelClass, monitor); this.asyncOutputWrapper = new OutputStreamWrapper(output); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java index fbcfc4c0ee8..c75998e1f86 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java @@ -20,13 +20,13 @@ package org.apache.hadoop.hbase.regionserver.wal; import java.io.IOException; import java.io.OutputStream; import java.util.concurrent.atomic.AtomicLong; - import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStreamBuilder; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; import org.apache.hadoop.hbase.util.AtomicUtils; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; @@ -106,7 +106,8 @@ public class ProtobufLogWriter extends AbstractProtobufLogWriter @Override protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize, - short replication, long blockSize) throws IOException, StreamLacksCapabilityException { + short replication, long blockSize, StreamSlowMonitor monitor) throws IOException, + StreamLacksCapabilityException { FSDataOutputStreamBuilder builder = fs .createFile(path) .overwrite(overwritable) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java index 8d46bad44f8..06729e2356a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java @@ -24,6 +24,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutput; import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper; import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper; +import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; import org.apache.hadoop.hbase.regionserver.wal.AsyncFSWAL; import org.apache.hadoop.hbase.regionserver.wal.AsyncProtobufLogWriter; import org.apache.hadoop.hbase.regionserver.wal.WALUtil; @@ -57,8 +58,8 @@ public class AsyncFSWALProvider extends AbstractFSWALProvider { * @throws StreamLacksCapabilityException if the given FileSystem can't provide streams that * meet the needs of the given Writer implementation. */ - void init(FileSystem fs, Path path, Configuration c, boolean overwritable, long blocksize) - throws IOException, CommonFSUtils.StreamLacksCapabilityException; + void init(FileSystem fs, Path path, Configuration c, boolean overwritable, long blocksize, + StreamSlowMonitor monitor) throws IOException, CommonFSUtils.StreamLacksCapabilityException; } private EventLoopGroup eventLoopGroup; @@ -68,10 +69,10 @@ public class AsyncFSWALProvider extends AbstractFSWALProvider { @Override protected AsyncFSWAL createWAL() throws IOException { return new AsyncFSWAL(CommonFSUtils.getWALFileSystem(conf), this.abortable, - CommonFSUtils.getWALRootDir(conf), getWALDirectoryName(factory.factoryId), - getWALArchiveDirectoryName(conf, factory.factoryId), conf, listeners, true, logPrefix, - META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null, eventLoopGroup, - channelClass); + CommonFSUtils.getWALRootDir(conf), getWALDirectoryName(factory.factoryId), + getWALArchiveDirectoryName(conf, factory.factoryId), conf, listeners, true, logPrefix, + META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null, eventLoopGroup, + channelClass, factory.getExcludeDatanodeManager().getStreamSlowMonitor(providerId)); } @Override @@ -89,7 +90,7 @@ public class AsyncFSWALProvider extends AbstractFSWALProvider { boolean overwritable, EventLoopGroup eventLoopGroup, Class channelClass) throws IOException { return createAsyncWriter(conf, fs, path, overwritable, WALUtil.getWALBlockSize(conf, fs, path), - eventLoopGroup, channelClass); + eventLoopGroup, channelClass, StreamSlowMonitor.create(conf, path.getName())); } /** @@ -97,14 +98,14 @@ public class AsyncFSWALProvider extends AbstractFSWALProvider { */ public static AsyncWriter createAsyncWriter(Configuration conf, FileSystem fs, Path path, boolean overwritable, long blocksize, EventLoopGroup eventLoopGroup, - Class channelClass) throws IOException { + Class channelClass, StreamSlowMonitor monitor) throws IOException { // Configuration already does caching for the Class lookup. Class logWriterClass = conf.getClass( WRITER_IMPL, AsyncProtobufLogWriter.class, AsyncWriter.class); try { AsyncWriter writer = logWriterClass.getConstructor(EventLoopGroup.class, Class.class) .newInstance(eventLoopGroup, channelClass); - writer.init(fs, path, conf, overwritable, blocksize); + writer.init(fs, path, conf, overwritable, blocksize, monitor); return writer; } catch (Exception e) { if (e instanceof CommonFSUtils.StreamLacksCapabilityException) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java index e64d70f5098..95d6a24b5d2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java @@ -22,6 +22,7 @@ import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; import org.apache.hadoop.hbase.regionserver.wal.FSHLog; import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter; import org.apache.hadoop.hbase.regionserver.wal.WALUtil; @@ -48,8 +49,8 @@ public class FSHLogProvider extends AbstractFSWALProvider { * @throws StreamLacksCapabilityException if the given FileSystem can't provide streams that * meet the needs of the given Writer implementation. */ - void init(FileSystem fs, Path path, Configuration c, boolean overwritable, long blocksize) - throws IOException, CommonFSUtils.StreamLacksCapabilityException; + void init(FileSystem fs, Path path, Configuration c, boolean overwritable, long blocksize, + StreamSlowMonitor monitor) throws IOException, CommonFSUtils.StreamLacksCapabilityException; } /** @@ -76,7 +77,8 @@ public class FSHLogProvider extends AbstractFSWALProvider { try { writer = logWriterClass.getDeclaredConstructor().newInstance(); FileSystem rootFs = FileSystem.get(path.toUri(), conf); - writer.init(rootFs, path, conf, overwritable, blocksize); + writer.init(rootFs, path, conf, overwritable, blocksize, + StreamSlowMonitor.create(conf, path.getName())); return writer; } catch (Exception e) { if (e instanceof CommonFSUtils.StreamLacksCapabilityException) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java index a5ceac9e03e..cff3154626b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java @@ -21,12 +21,12 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.util.List; import java.util.concurrent.atomic.AtomicReference; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager; import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL; import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader; import org.apache.hadoop.hbase.util.CancelableProgressable; @@ -105,6 +105,8 @@ public class WALFactory { private final Configuration conf; + private final ExcludeDatanodeManager excludeDatanodeManager; + // Used for the singleton WALFactory, see below. private WALFactory(Configuration conf) { // this code is duplicated here so we can keep our members final. @@ -121,6 +123,7 @@ public class WALFactory { provider = null; factoryId = SINGLETON_ID; this.abortable = null; + this.excludeDatanodeManager = new ExcludeDatanodeManager(conf); } Providers getDefaultProvider() { @@ -197,6 +200,7 @@ public class WALFactory { AbstractFSWALProvider.Reader.class); this.conf = conf; this.factoryId = factoryId; + this.excludeDatanodeManager = new ExcludeDatanodeManager(conf); this.abortable = abortable; // end required early initialization if (conf.getBoolean(WAL_ENABLED, true)) { @@ -502,4 +506,8 @@ public class WALFactory { public final WALProvider getMetaWALProvider() { return this.metaProvider.get(); } + + public ExcludeDatanodeManager getExcludeDatanodeManager() { + return excludeDatanodeManager; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java index 4d2213517d1..4c454d96e89 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java @@ -70,6 +70,7 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine; import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher; @@ -1179,7 +1180,8 @@ public abstract class AbstractTestWALReplay { throws IOException, StreamLacksCapabilityException { fs.mkdirs(file.getParent()); ProtobufLogWriter writer = new ProtobufLogWriter(); - writer.init(fs, file, conf, true, WALUtil.getWALBlockSize(conf, fs, file)); + writer.init(fs, file, conf, true, WALUtil.getWALBlockSize(conf, fs, file), + StreamSlowMonitor.create(conf, "testMonitor")); for (FSWALEntry entry : entries) { writer.append(entry); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestRecoverStandbyProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestRecoverStandbyProcedure.java index bcf7e62bdbe..9b803dc4f7a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestRecoverStandbyProcedure.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestRecoverStandbyProcedure.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.replication.RecoverStandbyProcedure; @@ -161,7 +162,9 @@ public class TestRecoverStandbyProcedure { for (int i = 0; i < WAL_NUMBER; i++) { try (ProtobufLogWriter writer = new ProtobufLogWriter()) { Path wal = new Path(peerRemoteWALDir, "srv1,8888." + i + ".syncrep"); - writer.init(fs, wal, conf, true, WALUtil.getWALBlockSize(conf, fs, peerRemoteWALDir)); + writer.init(fs, wal, conf, true, + WALUtil.getWALBlockSize(conf, fs, peerRemoteWALDir), + StreamSlowMonitor.create(conf, "defaultMonitor")); List entries = setupWALEntries(i * ROW_COUNT, (i + 1) * ROW_COUNT); for (Entry entry : entries) { writer.append(entry); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java index ecbd0432be1..88ccbc7e2ee 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.RegionInfo; // imports for things that haven't moved from regionserver.wal yet. +import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; import org.apache.hadoop.hbase.regionserver.wal.FSHLog; import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; @@ -212,7 +213,8 @@ public class IOTestProvider implements WALProvider { LOG.info("creating new writer instance."); final ProtobufLogWriter writer = new IOTestWriter(); try { - writer.init(fs, path, conf, false, this.blocksize); + writer.init(fs, path, conf, false, this.blocksize, + StreamSlowMonitor.create(conf, path.getName())); } catch (CommonFSUtils.StreamLacksCapabilityException exception) { throw new IOException("Can't create writer instance because underlying FileSystem " + "doesn't support needed stream capabilities.", exception); @@ -240,7 +242,8 @@ public class IOTestProvider implements WALProvider { @Override public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable, - long blocksize) throws IOException, CommonFSUtils.StreamLacksCapabilityException { + long blocksize, StreamSlowMonitor monitor) throws IOException, + CommonFSUtils.StreamLacksCapabilityException { Collection operations = conf.getStringCollection(ALLOWED_OPERATIONS); if (operations.isEmpty() || operations.contains(AllowedOperations.all.name())) { doAppends = doSyncs = true; @@ -252,7 +255,7 @@ public class IOTestProvider implements WALProvider { } LOG.info("IOTestWriter initialized with appends " + (doAppends ? "enabled" : "disabled") + " and syncs " + (doSyncs ? "enabled" : "disabled")); - super.init(fs, path, conf, overwritable, blocksize); + super.init(fs, path, conf, overwritable, blocksize, monitor); } @Override