From 4cdda4ba87f24a172fd1b4395d59643ea5554df0 Mon Sep 17 00:00:00 2001 From: Hanisha Koneru Date: Wed, 5 Apr 2017 07:35:09 -0700 Subject: [PATCH] HDFS-11545. Propagate DataNode's slow disks info to the NameNode via Heartbeat. Contributed by Hanisha Koneru. --- .../hdfs/server/protocol/SlowDiskReports.java | 126 ++++++++++++++++++ ...atanodeProtocolClientSideTranslatorPB.java | 7 +- ...atanodeProtocolServerSideTranslatorPB.java | 3 +- .../hadoop/hdfs/protocolPB/PBHelper.java | 68 ++++++++++ .../blockmanagement/DatanodeManager.java | 3 +- .../blockmanagement/SlowPeerTracker.java | 2 +- .../hdfs/server/datanode/BPServiceActor.java | 32 +++-- .../datanode/metrics/DataNodeDiskMetrics.java | 35 +++-- .../hdfs/server/namenode/FSNamesystem.java | 6 +- .../server/namenode/NameNodeRpcServer.java | 6 +- .../server/protocol/DatanodeProtocol.java | 3 +- .../src/main/proto/DatanodeProtocol.proto | 15 +++ .../hadoop/hdfs/protocolPB/TestPBHelper.java | 28 ++++ .../TestNameNodePrunesMissingStorages.java | 4 +- .../datanode/InternalDataNodeTestUtils.java | 4 +- .../server/datanode/TestBPOfferService.java | 4 +- .../server/datanode/TestBlockRecovery.java | 4 +- .../datanode/TestBpServiceActorScheduler.java | 20 +-- .../server/datanode/TestDataNodeLifeline.java | 8 +- .../server/datanode/TestDataNodeMXBean.java | 2 +- .../TestDatanodeProtocolRetryPolicy.java | 4 +- .../server/datanode/TestFsDatasetCache.java | 4 +- .../server/datanode/TestStorageReport.java | 4 +- .../namenode/NNThroughputBenchmark.java | 7 +- .../hdfs/server/namenode/NameNodeAdapter.java | 3 +- .../server/namenode/TestDeadDatanode.java | 4 +- 26 files changed, 340 insertions(+), 66 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/SlowDiskReports.java diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/SlowDiskReports.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/SlowDiskReports.java new file mode 100644 index 00000000000..d548eeb3c99 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/SlowDiskReports.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.protocol; + +import com.google.common.collect.ImmutableMap; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.util.Map; + +/** + * A class that allows a DataNode to communicate information about all + * its disks that appear to be slow. + * + * The wire representation of this structure is a list of + * SlowDiskReportProto messages. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public final class SlowDiskReports { + /** + * A map from the DataNode Disk's BasePath to its mean metadata op latency, + * mean read io latency and mean write io latency. + * + * The NameNode must not attempt to interpret the mean latencies + * beyond exposing them as a diagnostic. e.g. metrics. Also, comparing + * latencies across reports from different DataNodes may not be not + * meaningful and must be avoided. + */ + @Nonnull + private final Map> slowDisks; + + /** + * An object representing a SlowPeerReports with no entries. Should + * be used instead of null or creating new objects when there are + * no slow peers to report. + */ + public static final SlowDiskReports EMPTY_REPORT = + new SlowDiskReports(ImmutableMap.>of()); + + private SlowDiskReports(Map> slowDisks) { + this.slowDisks = slowDisks; + } + + public static SlowDiskReports create( + @Nullable Map> slowDisks) { + if (slowDisks == null || slowDisks.isEmpty()) { + return EMPTY_REPORT; + } + return new SlowDiskReports(slowDisks); + } + + public Map> getSlowDisks() { + return slowDisks; + } + + public boolean haveSlowDisks() { + return slowDisks.size() > 0; + } + + /** + * Return true if the two objects represent the same set slow disk + * entries. Primarily for unit testing convenience. + */ + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (!(o instanceof SlowDiskReports)) { + return false; + } + + SlowDiskReports that = (SlowDiskReports) o; + + if (this.slowDisks.size() != that.slowDisks.size()) { + return false; + } + + if (!this.slowDisks.keySet().containsAll(that.slowDisks.keySet()) || + !that.slowDisks.keySet().containsAll(this.slowDisks.keySet())) { + return false; + } + + boolean areEqual; + for (String disk : this.slowDisks.keySet()) { + if (!this.slowDisks.get(disk).equals(that.slowDisks.get(disk))) { + return false; + } + } + + return true; + } + + @Override + public int hashCode() { + return slowDisks.hashCode(); + } + + /** + * Lists the types of operations on which disk latencies are measured. + */ + public enum DiskOp { + METADATA, + READ, + WRITE + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java index 1794cbdef9f..fe8fff7776e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java @@ -55,6 +55,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo.Capability; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; +import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports; import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports; import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; @@ -136,7 +137,8 @@ public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration, int xmitsInProgress, int xceiverCount, int failedVolumes, VolumeFailureSummary volumeFailureSummary, boolean requestFullBlockReportLease, - @Nonnull SlowPeerReports slowPeers) throws IOException { + @Nonnull SlowPeerReports slowPeers, + @Nonnull SlowDiskReports slowDisks) throws IOException { HeartbeatRequestProto.Builder builder = HeartbeatRequestProto.newBuilder() .setRegistration(PBHelper.convert(registration)) .setXmitsInProgress(xmitsInProgress).setXceiverCount(xceiverCount) @@ -156,6 +158,9 @@ public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration, if (slowPeers.haveSlowPeers()) { builder.addAllSlowPeers(PBHelper.convertSlowPeerInfo(slowPeers)); } + if (slowDisks.haveSlowDisks()) { + builder.addAllSlowDisks(PBHelper.convertSlowDiskInfo(slowDisks)); + } HeartbeatResponseProto resp; try { resp = rpcProxy.sendHeartbeat(NULL_CONTROLLER, builder.build()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java index cdfcf4cb6b9..f3528bbf13f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java @@ -121,7 +121,8 @@ public HeartbeatResponseProto sendHeartbeat(RpcController controller, request.getXmitsInProgress(), request.getXceiverCount(), request.getFailedVolumes(), volumeFailureSummary, request.getRequestFullBlockReportLease(), - PBHelper.convertSlowPeerInfo(request.getSlowPeersList())); + PBHelper.convertSlowPeerInfo(request.getSlowPeersList()), + PBHelper.convertSlowDiskInfo(request.getSlowDisksList())); } catch (IOException e) { throw new ServiceException(e); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index 5ee7847113b..dbdbfc412bc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -46,6 +46,8 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.KeyUpdateCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReceivedDeletedBlockInfoProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterCommandProto; +import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos + .SlowDiskReportProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.SlowPeerReportProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.VolumeFailureSummaryProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportContextProto; @@ -104,6 +106,7 @@ import org.apache.hadoop.hdfs.server.protocol.RegisterCommand; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; +import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports; import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports; import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; @@ -832,6 +835,71 @@ public static SlowPeerReports convertSlowPeerInfo( return SlowPeerReports.create(slowPeersMap); } + public static List convertSlowDiskInfo( + SlowDiskReports slowDisks) { + if (slowDisks.getSlowDisks().size() == 0) { + return Collections.emptyList(); + } + + List slowDiskInfoProtos = + new ArrayList<>(slowDisks.getSlowDisks().size()); + for (Map.Entry> entry : + slowDisks.getSlowDisks().entrySet()) { + SlowDiskReportProto.Builder builder = SlowDiskReportProto.newBuilder(); + builder.setBasePath(entry.getKey()); + Map value = entry.getValue(); + if (value.get(SlowDiskReports.DiskOp.METADATA) != null) { + builder.setMeanMetadataOpLatency(value.get( + SlowDiskReports.DiskOp.METADATA)); + } + if (value.get(SlowDiskReports.DiskOp.READ) != null) { + builder.setMeanReadIoLatency(value.get( + SlowDiskReports.DiskOp.READ)); + } + if (value.get(SlowDiskReports.DiskOp.WRITE) != null) { + builder.setMeanWriteIoLatency(value.get( + SlowDiskReports.DiskOp.WRITE)); + } + slowDiskInfoProtos.add(builder.build()); + } + + return slowDiskInfoProtos; + } + + public static SlowDiskReports convertSlowDiskInfo( + List slowDiskProtos) { + + // No slow disks, or possibly an older DataNode. + if (slowDiskProtos == null || slowDiskProtos.size() == 0) { + return SlowDiskReports.EMPTY_REPORT; + } + + Map> slowDisksMap = + new HashMap<>(slowDiskProtos.size()); + for (SlowDiskReportProto proto : slowDiskProtos) { + if (!proto.hasBasePath()) { + // The disk basePath should be reported. + continue; + } + Map latencyMap = new HashMap<>(); + if (proto.hasMeanMetadataOpLatency()) { + latencyMap.put(SlowDiskReports.DiskOp.METADATA, + proto.getMeanMetadataOpLatency()); + } + if (proto.hasMeanReadIoLatency()) { + latencyMap.put(SlowDiskReports.DiskOp.READ, + proto.getMeanReadIoLatency()); + } + if (proto.hasMeanWriteIoLatency()) { + latencyMap.put(SlowDiskReports.DiskOp.WRITE, + proto.getMeanWriteIoLatency()); + } + + slowDisksMap.put(proto.getBasePath(), latencyMap); + } + return SlowDiskReports.create(slowDisksMap); + } + public static JournalInfo convert(JournalInfoProto info) { int lv = info.hasLayoutVersion() ? info.getLayoutVersion() : 0; int nsID = info.hasNamespaceID() ? info.getNamespaceID() : 0; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index 7f8afa9ec76..04ba5aacce2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -1508,7 +1508,8 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg, long cacheCapacity, long cacheUsed, int xceiverCount, int maxTransfers, int failedVolumes, VolumeFailureSummary volumeFailureSummary, - @Nonnull SlowPeerReports slowPeers) throws IOException { + @Nonnull SlowPeerReports slowPeers, + @Nonnull SlowDiskReports slowDisks) throws IOException { final DatanodeDescriptor nodeinfo; try { nodeinfo = getDatanode(nodeReg); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerTracker.java index c8a63488d09..98d09274fe5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerTracker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerTracker.java @@ -58,7 +58,7 @@ public class SlowPeerTracker { /** * Time duration after which a report is considered stale. This is - * set to DFS_DATANODE_SLOW_PEER_REPORT_INTERVAL_KEY * 3 i.e. + * set to DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY * 3 i.e. * maintained for at least two successive reports. */ private final long reportValidityMs; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index ec8c79be1ac..235d30f6301 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -57,6 +57,7 @@ import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException; import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; +import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports; import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports; import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; import org.apache.hadoop.hdfs.server.protocol.StorageReport; @@ -499,11 +500,15 @@ HeartbeatResponse sendHeartBeat(boolean requestBlockReportLease) .getVolumeFailureSummary(); int numFailedVolumes = volumeFailureSummary != null ? volumeFailureSummary.getFailedStorageLocations().length : 0; - final boolean slowPeersReportDue = scheduler.isSlowPeersReportDue(now); + final boolean outliersReportDue = scheduler.isOutliersReportDue(now); final SlowPeerReports slowPeers = - slowPeersReportDue && dn.getPeerMetrics() != null ? + outliersReportDue && dn.getPeerMetrics() != null ? SlowPeerReports.create(dn.getPeerMetrics().getOutliers()) : SlowPeerReports.EMPTY_REPORT; + final SlowDiskReports slowDisks = + outliersReportDue && dn.getDiskMetrics() != null ? + SlowDiskReports.create(dn.getDiskMetrics().getDiskOutliersStats()) : + SlowDiskReports.EMPTY_REPORT; HeartbeatResponse response = bpNamenode.sendHeartbeat(bpRegistration, reports, dn.getFSDataset().getCacheCapacity(), @@ -513,11 +518,12 @@ HeartbeatResponse sendHeartBeat(boolean requestBlockReportLease) numFailedVolumes, volumeFailureSummary, requestBlockReportLease, - slowPeers); + slowPeers, + slowDisks); - if (slowPeersReportDue) { + if (outliersReportDue) { // If the report was due and successfully sent, schedule the next one. - scheduler.scheduleNextSlowPeerReport(); + scheduler.scheduleNextOutlierReport(); } return response; } @@ -1097,7 +1103,7 @@ static class Scheduler { boolean resetBlockReportTime = true; @VisibleForTesting - volatile long nextSlowPeersReportTime = monotonicNow(); + volatile long nextOutliersReportTime = monotonicNow(); private final AtomicBoolean forceFullBlockReport = new AtomicBoolean(false); @@ -1105,14 +1111,14 @@ static class Scheduler { private final long heartbeatIntervalMs; private final long lifelineIntervalMs; private final long blockReportIntervalMs; - private final long slowPeersReportIntervalMs; + private final long outliersReportIntervalMs; Scheduler(long heartbeatIntervalMs, long lifelineIntervalMs, - long blockReportIntervalMs, long slowPeersReportIntervalMs) { + long blockReportIntervalMs, long outliersReportIntervalMs) { this.heartbeatIntervalMs = heartbeatIntervalMs; this.lifelineIntervalMs = lifelineIntervalMs; this.blockReportIntervalMs = blockReportIntervalMs; - this.slowPeersReportIntervalMs = slowPeersReportIntervalMs; + this.outliersReportIntervalMs = outliersReportIntervalMs; scheduleNextLifeline(nextHeartbeatTime); } @@ -1145,8 +1151,8 @@ void updateLastBlockReportTime(long blockReportTime) { lastBlockReportTime = blockReportTime; } - void scheduleNextSlowPeerReport() { - nextSlowPeersReportTime = monotonicNow() + slowPeersReportIntervalMs; + void scheduleNextOutlierReport() { + nextOutliersReportTime = monotonicNow() + outliersReportIntervalMs; } long getLastHearbeatTime() { @@ -1175,8 +1181,8 @@ boolean isBlockReportDue(long curTime) { return nextBlockReportTime - curTime <= 0; } - boolean isSlowPeersReportDue(long curTime) { - return nextSlowPeersReportTime - curTime <= 0; + boolean isOutliersReportDue(long curTime) { + return nextOutliersReportTime - curTime <= 0; } void forceFullBlockReportNow() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeDiskMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeDiskMetrics.java index b6d5cf0d2dd..a543c695b8e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeDiskMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeDiskMetrics.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.DataNodeVolumeMetrics; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; +import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports.DiskOp; import org.apache.hadoop.util.Daemon; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,7 +55,7 @@ public class DataNodeDiskMetrics { private volatile boolean shouldRun; private OutlierDetector slowDiskDetector; private Daemon slowDiskDetectionDaemon; - private volatile Map> + private volatile Map> diskOutliersStats = Maps.newHashMap(); public DataNodeDiskMetrics(DataNode dn, long diskOutlierDetectionIntervalMs) { @@ -144,13 +145,13 @@ private void detectAndUpdateDiskOutliers(Map metadataOpStats, diskOutliersSet.addAll(writeIoOutliers.keySet()); } - Map> diskStats = + Map> diskStats = Maps.newHashMap(); for (String disk : diskOutliersSet) { - Map diskStat = Maps.newHashMap(); - diskStat.put(DiskOutlierDetectionOp.METADATA, metadataOpStats.get(disk)); - diskStat.put(DiskOutlierDetectionOp.READ, readIoStats.get(disk)); - diskStat.put(DiskOutlierDetectionOp.WRITE, writeIoStats.get(disk)); + Map diskStat = Maps.newHashMap(); + diskStat.put(DiskOp.METADATA, metadataOpStats.get(disk)); + diskStat.put(DiskOp.READ, readIoStats.get(disk)); + diskStat.put(DiskOp.WRITE, writeIoStats.get(disk)); diskStats.put(disk, diskStat); } @@ -158,17 +159,7 @@ private void detectAndUpdateDiskOutliers(Map metadataOpStats, LOG.debug("Updated disk outliers."); } - /** - * Lists the types of operations on which disk latencies are measured. - */ - public enum DiskOutlierDetectionOp { - METADATA, - READ, - WRITE - } - - public Map> getDiskOutliersStats() { + public Map> getDiskOutliersStats() { return diskOutliersStats; } @@ -186,8 +177,12 @@ public void shutdownAndWait() { * Use only for testing. */ @VisibleForTesting - public void addSlowDiskForTesting(String slowDiskPath) { - diskOutliersStats.put(slowDiskPath, - ImmutableMap.of()); + public void addSlowDiskForTesting(String slowDiskPath, + Map latencies) { + if (latencies == null) { + diskOutliersStats.put(slowDiskPath, ImmutableMap.of()); + } else { + diskOutliersStats.put(slowDiskPath, latencies); + } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 54907bd1641..c22a70a3cca 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -91,6 +91,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SUPPORT_APPEND_KEY; import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.SECURITY_XATTR_UNREADABLE_BY_SUPERUSER; import static org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.*; +import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports; import static org.apache.hadoop.util.Time.now; import static org.apache.hadoop.util.Time.monotonicNow; import static org.apache.hadoop.hdfs.server.namenode.top.metrics.TopMetrics.TOPMETRICS_METRICS_SOURCE_NAME; @@ -3625,7 +3626,8 @@ HeartbeatResponse handleHeartbeat(DatanodeRegistration nodeReg, int xceiverCount, int xmitsInProgress, int failedVolumes, VolumeFailureSummary volumeFailureSummary, boolean requestFullBlockReportLease, - @Nonnull SlowPeerReports slowPeers) throws IOException { + @Nonnull SlowPeerReports slowPeers, + @Nonnull SlowDiskReports slowDisks) throws IOException { readLock(); try { //get datanode commands @@ -3634,7 +3636,7 @@ HeartbeatResponse handleHeartbeat(DatanodeRegistration nodeReg, DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat( nodeReg, reports, getBlockPoolId(), cacheCapacity, cacheUsed, xceiverCount, maxTransfer, failedVolumes, volumeFailureSummary, - slowPeers); + slowPeers, slowDisks); long blockReportLeaseId = 0; if (requestFullBlockReportLease) { blockReportLeaseId = blockManager.requestBlockReportLeaseId(nodeReg); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 59a741e1293..c8c784e07c3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -152,6 +152,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.NodeRegistration; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; +import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports; import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports; import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; @@ -1416,13 +1417,14 @@ public HeartbeatResponse sendHeartbeat(DatanodeRegistration nodeReg, int xmitsInProgress, int xceiverCount, int failedVolumes, VolumeFailureSummary volumeFailureSummary, boolean requestFullBlockReportLease, - @Nonnull SlowPeerReports slowPeers) throws IOException { + @Nonnull SlowPeerReports slowPeers, + @Nonnull SlowDiskReports slowDisks) throws IOException { checkNNStartup(); verifyRequest(nodeReg); return namesystem.handleHeartbeat(nodeReg, report, dnCacheCapacity, dnCacheUsed, xceiverCount, xmitsInProgress, failedVolumes, volumeFailureSummary, requestFullBlockReportLease, - slowPeers); + slowPeers, slowDisks); } @Override // DatanodeProtocol diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java index a15aea81e55..a8e3edd1ab0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java @@ -121,7 +121,8 @@ public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration, int failedVolumes, VolumeFailureSummary volumeFailureSummary, boolean requestFullBlockReportLease, - @Nonnull SlowPeerReports slowPeers) + @Nonnull SlowPeerReports slowPeers, + @Nonnull SlowDiskReports slowDisks) throws IOException; /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto index 393cc43116a..0e4b2fb959c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto @@ -186,6 +186,7 @@ message VolumeFailureSummaryProto { * cacheUsed - amount of cache used * volumeFailureSummary - info about volume failures * slowPeers - info about peer DataNodes that are suspected to be slow. + * slowDisks - info about DataNode disks that are suspected to be slow. */ message HeartbeatRequestProto { required DatanodeRegistrationProto registration = 1; // Datanode info @@ -198,6 +199,7 @@ message HeartbeatRequestProto { optional VolumeFailureSummaryProto volumeFailureSummary = 8; optional bool requestFullBlockReportLease = 9 [ default = false ]; repeated SlowPeerReportProto slowPeers = 10; + repeated SlowDiskReportProto slowDisks = 11; } /** @@ -392,6 +394,19 @@ message SlowPeerReportProto { optional double aggregateLatency = 2; } +/** + * Information about a single slow disk that may be reported by + * the DataNode to the NameNode as part of the heartbeat request. + * The message includes the disk's basePath, mean metadata op latency, + * mean read io latency and mean write io latency as observed by the DataNode. + */ +message SlowDiskReportProto { + optional string basePath = 1; + optional double meanMetadataOpLatency = 2; + optional double meanReadIoLatency = 3; + optional double meanWriteIoLatency = 4; +} + /** * Protocol used from datanode to the namenode * See the request and response for details of rpc call. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java index 3280563c254..9333efc467a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs.protocolPB; +import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports; import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertSame; @@ -26,6 +27,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Map; import com.google.common.collect.ImmutableMap; import org.apache.hadoop.fs.permission.AclEntry; @@ -700,4 +702,30 @@ public void testSlowPeerInfoPBHelper() { "Expected empty map:" + ", got map:" + slowPeersConverted2, slowPeersConverted2.equals(SlowPeerReports.EMPTY_REPORT)); } + + @Test + public void testSlowDiskInfoPBHelper() { + // Test with a map that has a few slow disk entries. + final SlowDiskReports slowDisks = SlowDiskReports.create( + ImmutableMap.>of( + "disk1", ImmutableMap.of(SlowDiskReports.DiskOp.METADATA, 0.5), + "disk2", ImmutableMap.of(SlowDiskReports.DiskOp.READ, 1.0, + SlowDiskReports.DiskOp.WRITE, 1.0), + "disk3", ImmutableMap.of(SlowDiskReports.DiskOp.METADATA, 1.2, + SlowDiskReports.DiskOp.READ, 1.5, + SlowDiskReports.DiskOp.WRITE, 1.3))); + SlowDiskReports slowDisksConverted1 = PBHelper.convertSlowDiskInfo( + PBHelper.convertSlowDiskInfo(slowDisks)); + assertTrue( + "Expected map:" + slowDisks + ", got map:" + + slowDisksConverted1.getSlowDisks(), + slowDisksConverted1.equals(slowDisks)); + + // Test with an empty map + SlowDiskReports slowDisksConverted2 = PBHelper.convertSlowDiskInfo( + PBHelper.convertSlowDiskInfo(SlowDiskReports.EMPTY_REPORT)); + assertTrue( + "Expected empty map:" + ", got map:" + slowDisksConverted2, + slowDisksConverted2.equals(SlowDiskReports.EMPTY_REPORT)); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java index fbfccbe74a6..f234bcc14da 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; +import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports; import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports; import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.test.GenericTestUtils; @@ -113,7 +114,8 @@ private static void runTest(final String testCaseName, // Stop the DataNode and send fake heartbeat with missing storage. cluster.stopDataNode(0); cluster.getNameNodeRpc().sendHeartbeat(dnReg, prunedReports, 0L, 0L, 0, 0, - 0, null, true, SlowPeerReports.EMPTY_REPORT); + 0, null, true, SlowPeerReports.EMPTY_REPORT, + SlowDiskReports.EMPTY_REPORT); // Check that the missing storage was pruned. assertThat(dnDescriptor.getStorageInfos().length, is(expectedStoragesAfterTest)); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java index cf43fd0fddc..876a8545ec5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.datanode; +import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -138,7 +139,8 @@ public DatanodeRegistration answer(InvocationOnMock invocation) Mockito.anyLong(), Mockito.anyInt(), Mockito.anyInt(), Mockito.anyInt(), Mockito.any(VolumeFailureSummary.class), Mockito.anyBoolean(), - Mockito.any(SlowPeerReports.class))).thenReturn( + Mockito.any(SlowPeerReports.class), + Mockito.any(SlowDiskReports.class))).thenReturn( new HeartbeatResponse(new DatanodeCommand[0], new NNHAStatusHeartbeat( HAServiceState.ACTIVE, 1), null, ThreadLocalRandom.current() .nextLong() | 1L)); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java index c6b38eea7db..b9220e0f6c3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.datanode; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY; +import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNotNull; @@ -154,7 +155,8 @@ private DatanodeProtocolClientSideTranslatorPB setupNNMock(int nnIdx) Mockito.anyInt(), Mockito.any(VolumeFailureSummary.class), Mockito.anyBoolean(), - Mockito.any(SlowPeerReports.class)); + Mockito.any(SlowPeerReports.class), + Mockito.any(SlowDiskReports.class)); mockHaStatuses[nnIdx] = new NNHAStatusHeartbeat(HAServiceState.STANDBY, 0); datanodeCommands[nnIdx] = new DatanodeCommand[0]; return mock; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java index fecddc57886..ab1ad486377 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.datanode; +import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; @@ -189,7 +190,8 @@ public DatanodeRegistration answer(InvocationOnMock invocation) Mockito.anyInt(), Mockito.any(VolumeFailureSummary.class), Mockito.anyBoolean(), - Mockito.any(SlowPeerReports.class))) + Mockito.any(SlowPeerReports.class), + Mockito.any(SlowDiskReports.class))) .thenReturn(new HeartbeatResponse( new DatanodeCommand[0], new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1), diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java index 6435d4d1a5c..753c3a8d6fb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java @@ -51,7 +51,7 @@ public class TestBpServiceActorScheduler { private static final long HEARTBEAT_INTERVAL_MS = 5000; // 5 seconds private static final long LIFELINE_INTERVAL_MS = 3 * HEARTBEAT_INTERVAL_MS; private static final long BLOCK_REPORT_INTERVAL_MS = 10000; // 10 seconds - private static final long SLOW_PEER_REPORT_INTERVAL_MS = 10000; // 10 seconds + private static final long OUTLIER_REPORT_INTERVAL_MS = 10000; // 10 seconds private final Random random = new Random(System.nanoTime()); @Test @@ -182,15 +182,15 @@ public void testScheduleLifeline() { } @Test - public void testSlowPeerReportScheduling() { + public void testOutlierReportScheduling() { for (final long now : getTimestamps()) { Scheduler scheduler = makeMockScheduler(now); - assertTrue(scheduler.isSlowPeersReportDue(now)); - scheduler.scheduleNextSlowPeerReport(); - assertFalse(scheduler.isSlowPeersReportDue(now)); - assertFalse(scheduler.isSlowPeersReportDue(now + 1)); - assertTrue(scheduler.isSlowPeersReportDue( - now + SLOW_PEER_REPORT_INTERVAL_MS)); + assertTrue(scheduler.isOutliersReportDue(now)); + scheduler.scheduleNextOutlierReport(); + assertFalse(scheduler.isOutliersReportDue(now)); + assertFalse(scheduler.isOutliersReportDue(now + 1)); + assertTrue(scheduler.isOutliersReportDue( + now + OUTLIER_REPORT_INTERVAL_MS)); } } @@ -198,11 +198,11 @@ private Scheduler makeMockScheduler(long now) { LOG.info("Using now = " + now); Scheduler mockScheduler = spy(new Scheduler( HEARTBEAT_INTERVAL_MS, LIFELINE_INTERVAL_MS, - BLOCK_REPORT_INTERVAL_MS, SLOW_PEER_REPORT_INTERVAL_MS)); + BLOCK_REPORT_INTERVAL_MS, OUTLIER_REPORT_INTERVAL_MS)); doReturn(now).when(mockScheduler).monotonicNow(); mockScheduler.nextBlockReportTime = now; mockScheduler.nextHeartbeatTime = now; - mockScheduler.nextSlowPeersReportTime = now; + mockScheduler.nextOutliersReportTime = now; return mockScheduler; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java index 8a9f0b8da1a..28427bce75a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java @@ -23,6 +23,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY; + +import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports; import static org.apache.hadoop.test.MetricsAsserts.getLongCounter; import static org.apache.hadoop.test.MetricsAsserts.getMetrics; import static org.junit.Assert.assertEquals; @@ -169,7 +171,8 @@ public void testSendLifelineIfHeartbeatBlocked() throws Exception { anyInt(), any(VolumeFailureSummary.class), anyBoolean(), - any(SlowPeerReports.class)); + any(SlowPeerReports.class), + any(SlowDiskReports.class)); // Intercept lifeline to trigger latch count-down on each call. doAnswer(new LatchCountingAnswer(lifelinesSent)) @@ -233,7 +236,8 @@ public void testNoLifelineSentIfHeartbeatsOnTime() throws Exception { anyInt(), any(VolumeFailureSummary.class), anyBoolean(), - any(SlowPeerReports.class)); + any(SlowPeerReports.class), + any(SlowDiskReports.class)); // While waiting on the latch for the expected number of heartbeat messages, // poll DataNode tracking information. We expect that the DataNode always diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMXBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMXBean.java index 89a96be5e04..1fcde6e8707 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMXBean.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMXBean.java @@ -221,7 +221,7 @@ public void testDataNodeMXBeanSlowDisksEnabled() throws Exception { Assert.assertEquals(datanodes.size(), 1); DataNode datanode = datanodes.get(0); String slowDiskPath = "test/data1/slowVolume"; - datanode.getDiskMetrics().addSlowDiskForTesting(slowDiskPath); + datanode.getDiskMetrics().addSlowDiskForTesting(slowDiskPath, null); MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); ObjectName mxbeanName = new ObjectName( diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java index c94f74ecc52..bb1d9eff0ee 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.datanode; +import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -220,7 +221,8 @@ public HeartbeatResponse answer(InvocationOnMock invocation) Mockito.anyInt(), Mockito.any(VolumeFailureSummary.class), Mockito.anyBoolean(), - Mockito.any(SlowPeerReports.class)); + Mockito.any(SlowPeerReports.class), + Mockito.any(SlowDiskReports.class)); dn = new DataNode(conf, locations, null, null) { @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java index eb015c03573..28bf13bb395 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs.server.datanode; +import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports; import static org.apache.hadoop.test.MetricsAsserts.getMetrics; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -173,7 +174,8 @@ private static void setHeartbeatResponse(DatanodeCommand[] cmds) (DatanodeRegistration) any(), (StorageReport[]) any(), anyLong(), anyLong(), anyInt(), anyInt(), anyInt(), (VolumeFailureSummary) any(), - anyBoolean(), any(SlowPeerReports.class)); + anyBoolean(), any(SlowPeerReports.class), + any(SlowDiskReports.class)); } private static DatanodeCommand[] cacheBlock(HdfsBlockLocation loc) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java index 2b793e9caaf..5f62ddb084b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; +import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports; import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports; import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; @@ -108,7 +109,8 @@ public void testStorageReportHasStorageTypeAndState() throws IOException { captor.capture(), anyLong(), anyLong(), anyInt(), anyInt(), anyInt(), Mockito.any(VolumeFailureSummary.class), Mockito.anyBoolean(), - Mockito.any(SlowPeerReports.class)); + Mockito.any(SlowPeerReports.class), + Mockito.any(SlowDiskReports.class)); StorageReport[] reports = captor.getValue(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java index 76ccd406eaf..b65f1113254 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java @@ -64,6 +64,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; +import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports; import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports; import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; @@ -953,7 +954,8 @@ void sendHeartbeat() throws IOException { DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED, 0L) }; DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration, rep, 0L, 0L, 0, 0, 0, null, true, - SlowPeerReports.EMPTY_REPORT).getCommands(); + SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT) + .getCommands(); if(cmds != null) { for (DatanodeCommand cmd : cmds ) { if(LOG.isDebugEnabled()) { @@ -1003,7 +1005,8 @@ int replicateBlocks() throws IOException { false, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED, 0) }; DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration, rep, 0L, 0L, 0, 0, 0, null, true, - SlowPeerReports.EMPTY_REPORT).getCommands(); + SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT) + .getCommands(); if (cmds != null) { for (DatanodeCommand cmd : cmds) { if (cmd.getAction() == DatanodeProtocol.DNA_TRANSFER) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java index 081027646d8..ae553665807 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs.server.namenode; +import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports; import static org.mockito.Mockito.spy; import java.io.File; @@ -119,7 +120,7 @@ public static HeartbeatResponse sendHeartBeat(DatanodeRegistration nodeReg, return namesystem.handleHeartbeat(nodeReg, BlockManagerTestUtil.getStorageReportsForDatanode(dd), dd.getCacheCapacity(), dd.getCacheRemaining(), 0, 0, 0, null, true, - SlowPeerReports.EMPTY_REPORT); + SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT); } public static boolean setReplication(final FSNamesystem ns, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java index a72b2a2f250..033acf29af8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs.server.namenode; +import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.fail; @@ -133,7 +134,8 @@ public void testDeadDatanode() throws Exception { false, 0, 0, 0, 0, 0) }; DatanodeCommand[] cmd = dnp.sendHeartbeat(reg, rep, 0L, 0L, 0, 0, 0, null, true, - SlowPeerReports.EMPTY_REPORT).getCommands(); + SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT) + .getCommands(); assertEquals(1, cmd.length); assertEquals(cmd[0].getAction(), RegisterCommand.REGISTER .getAction());