diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/SCMNodeStat.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/SCMNodeStat.java index 4fe72fc4a49..3c871d3ef50 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/SCMNodeStat.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/metrics/SCMNodeStat.java @@ -136,25 +136,4 @@ public class SCMNodeStat implements NodeStat { public int hashCode() { return Long.hashCode(capacity.get() ^ scmUsed.get() ^ remaining.get()); } - - - /** - * Truncate to 4 digits since uncontrolled precision is some times - * counter intuitive to what users expect. - * @param value - double. - * @return double. - */ - private double truncateDecimals(double value) { - final int multiplier = 10000; - return (double) ((long) (value * multiplier)) / multiplier; - } - - /** - * get the scmUsed ratio - */ - public double getScmUsedratio() { - double scmUsedRatio = - truncateDecimals(getScmUsed().get() / (double) getCapacity().get()); - return scmUsedRatio; - } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeStorageStatMXBean.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeStorageStatMXBean.java index f17a9703aa2..d81ff0f3cb6 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeStorageStatMXBean.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeStorageStatMXBean.java @@ -19,7 +19,9 @@ package org.apache.hadoop.hdds.scm.node; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.ozone.container.common.impl.StorageLocationReport; +import java.util.Set; import java.util.UUID; /** @@ -66,4 +68,10 @@ public interface SCMNodeStorageStatMXBean { * @return long */ long getTotalFreeSpace(); + + /** + * Returns the set of disks for a given Datanode. + * @return set of storage volumes + */ + Set getStorageVolumes(UUID datanodeId); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeStorageStatMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeStorageStatMap.java index 25cb357d3e9..f8ad2af8526 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeStorageStatMap.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeStorageStatMap.java @@ -22,18 +22,18 @@ package org.apache.hadoop.hdds.scm.node; import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; -import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; +import org.apache.hadoop.hdds.protocol.proto. + StorageContainerDatanodeProtocolProtos.SCMStorageReport; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.metrics2.util.MBeans; import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.container.common.impl.StorageLocationReport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.management.ObjectName; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.UUID; +import java.io.IOException; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; @@ -52,16 +52,15 @@ public class SCMNodeStorageStatMap implements SCMNodeStorageStatMXBean { private final double warningUtilizationThreshold; private final double criticalUtilizationThreshold; - private final Map scmNodeStorageStatMap; + private final Map> scmNodeStorageReportMap; // NodeStorageInfo MXBean private ObjectName scmNodeStorageInfoBean; - // Aggregated node stats - private SCMNodeStat clusterStat; /** - * constructs the scmNodeStorageStatMap object + * constructs the scmNodeStorageReportMap object */ public SCMNodeStorageStatMap(OzoneConfiguration conf) { - scmNodeStorageStatMap = new ConcurrentHashMap<>(); + // scmNodeStorageReportMap = new ConcurrentHashMap<>(); + scmNodeStorageReportMap = new ConcurrentHashMap<>(); warningUtilizationThreshold = conf.getDouble( OzoneConfigKeys. HDDS_DATANODE_STORAGE_UTILIZATION_WARNING_THRESHOLD, @@ -72,7 +71,6 @@ public class SCMNodeStorageStatMap implements SCMNodeStorageStatMXBean { HDDS_DATANODE_STORAGE_UTILIZATION_CRITICAL_THRESHOLD, OzoneConfigKeys. HDDS_DATANODE_STORAGE_UTILIZATION_CRITICAL_THRESHOLD_DEFAULT); - clusterStat = new SCMNodeStat(); } public enum UtilizationThreshold { @@ -81,20 +79,22 @@ public class SCMNodeStorageStatMap implements SCMNodeStorageStatMXBean { /** * Returns true if this a datanode that is already tracked by - * scmNodeStorageStatMap. + * scmNodeStorageReportMap. * * @param datanodeID - UUID of the Datanode. * @return True if this is tracked, false if this map does not know about it. */ public boolean isKnownDatanode(UUID datanodeID) { Preconditions.checkNotNull(datanodeID); - return scmNodeStorageStatMap.containsKey(datanodeID); + return scmNodeStorageReportMap.containsKey(datanodeID); } public List getDatanodeList( UtilizationThreshold threshold) { - return scmNodeStorageStatMap.entrySet().stream() - .filter(entry -> (isThresholdReached(threshold, entry.getValue()))) + return scmNodeStorageReportMap.entrySet().stream().filter( + entry -> (isThresholdReached(threshold, + getScmUsedratio(getUsedSpace(entry.getKey()), + getCapacity(entry.getKey()))))) .map(Map.Entry::getKey) .collect(Collectors.toList()); } @@ -105,19 +105,19 @@ public class SCMNodeStorageStatMap implements SCMNodeStorageStatMXBean { * Insert a new datanode into Node2Container Map. * * @param datanodeID -- Datanode UUID - * @param stat - scmNode stat for the Datanode. + * @param report - set if StorageReports. */ - public void insertNewDatanode(UUID datanodeID, SCMNodeStat stat) + public void insertNewDatanode(UUID datanodeID, Set report) throws SCMException { - Preconditions.checkNotNull(stat); + Preconditions.checkNotNull(report); + Preconditions.checkState(report.size() != 0); Preconditions.checkNotNull(datanodeID); - synchronized (scmNodeStorageStatMap) { + synchronized (scmNodeStorageReportMap) { if (isKnownDatanode(datanodeID)) { throw new SCMException("Node already exists in the map", DUPLICATE_DATANODE); } - scmNodeStorageStatMap.put(datanodeID, stat); - clusterStat.add(stat); + scmNodeStorageReportMap.putIfAbsent(datanodeID, report); } } @@ -138,72 +138,103 @@ public class SCMNodeStorageStatMap implements SCMNodeStorageStatMXBean { * Updates the Container list of an existing DN. * * @param datanodeID - UUID of DN. - * @param stat - scmNode stat for the Datanode. + * @param report - set of Storage Reports for the Datanode. * @throws SCMException - if we don't know about this datanode, for new DN * use insertNewDatanode. */ - public void updateDatanodeMap(UUID datanodeID, SCMNodeStat stat) + public void updateDatanodeMap(UUID datanodeID, Set report) throws SCMException { Preconditions.checkNotNull(datanodeID); - Preconditions.checkNotNull(stat); - synchronized (scmNodeStorageStatMap) { - if (!scmNodeStorageStatMap.containsKey(datanodeID)) { + Preconditions.checkNotNull(report); + Preconditions.checkState(report.size() != 0); + synchronized (scmNodeStorageReportMap) { + if (!scmNodeStorageReportMap.containsKey(datanodeID)) { throw new SCMException("No such datanode", NO_SUCH_DATANODE); } - SCMNodeStat removed = scmNodeStorageStatMap.get(datanodeID); - clusterStat.subtract(removed); - scmNodeStorageStatMap.put(datanodeID, stat); - clusterStat.add(stat); + scmNodeStorageReportMap.put(datanodeID, report); } } - public NodeReportStatus processNodeReport(UUID datanodeID, + public StorageReportResult processNodeReport(UUID datanodeID, StorageContainerDatanodeProtocolProtos.SCMNodeReport nodeReport) - throws SCMException { + throws IOException { Preconditions.checkNotNull(datanodeID); Preconditions.checkNotNull(nodeReport); + long totalCapacity = 0; long totalRemaining = 0; long totalScmUsed = 0; - List + Set storagReportSet = new HashSet<>(); + Set fullVolumeSet = new HashSet<>(); + Set failedVolumeSet = new HashSet<>(); + List storageReports = nodeReport.getStorageReportList(); - for (StorageContainerDatanodeProtocolProtos.SCMStorageReport report : storageReports) { + for (SCMStorageReport report : storageReports) { + StorageLocationReport storageReport = + StorageLocationReport.getFromProtobuf(report); + storagReportSet.add(storageReport); + if (report.hasFailed() && report.getFailed()) { + failedVolumeSet.add(storageReport); + } else if (isThresholdReached(UtilizationThreshold.CRITICAL, + getScmUsedratio(report.getScmUsed(), report.getCapacity()))) { + fullVolumeSet.add(storageReport); + } totalCapacity += report.getCapacity(); totalRemaining += report.getRemaining(); totalScmUsed += report.getScmUsed(); } - SCMNodeStat stat = scmNodeStorageStatMap.get(datanodeID); - if (stat == null) { - stat = new SCMNodeStat(); - stat.set(totalCapacity, totalScmUsed, totalRemaining); - insertNewDatanode(datanodeID, stat); + + if (!isKnownDatanode(datanodeID)) { + insertNewDatanode(datanodeID, storagReportSet); } else { - stat.set(totalCapacity, totalScmUsed, totalRemaining); - updateDatanodeMap(datanodeID, stat); + updateDatanodeMap(datanodeID, storagReportSet); } - if (isThresholdReached(UtilizationThreshold.CRITICAL, stat)) { + if (isThresholdReached(UtilizationThreshold.CRITICAL, + getScmUsedratio(totalScmUsed, totalCapacity))) { LOG.warn("Datanode {} is out of storage space. Capacity: {}, Used: {}", - datanodeID, stat.getCapacity().get(), stat.getScmUsed().get()); - return NodeReportStatus.DATANODE_OUT_OF_SPACE; - } else { - if (isThresholdReached(UtilizationThreshold.WARN, stat)) { - LOG.warn("Datanode {} is low on storage space. Capacity: {}, Used: {}", - datanodeID, stat.getCapacity().get(), stat.getScmUsed().get()); - } - return NodeReportStatus.ALL_IS_WELL; + datanodeID, totalCapacity, totalScmUsed); + return StorageReportResult.ReportResultBuilder.newBuilder() + .setStatus(ReportStatus.DATANODE_OUT_OF_SPACE) + .setFullVolumeSet(fullVolumeSet).setFailedVolumeSet(failedVolumeSet) + .build(); } + if (isThresholdReached(UtilizationThreshold.WARN, + getScmUsedratio(totalScmUsed, totalCapacity))) { + LOG.warn("Datanode {} is low on storage space. Capacity: {}, Used: {}", + datanodeID, totalCapacity, totalScmUsed); + } + + if (failedVolumeSet.isEmpty() && !fullVolumeSet.isEmpty()) { + return StorageReportResult.ReportResultBuilder.newBuilder() + .setStatus(ReportStatus.STORAGE_OUT_OF_SPACE) + .setFullVolumeSet(fullVolumeSet).build(); + } + + if (!failedVolumeSet.isEmpty() && fullVolumeSet.isEmpty()) { + return StorageReportResult.ReportResultBuilder.newBuilder() + .setStatus(ReportStatus.FAILED_STORAGE) + .setFailedVolumeSet(failedVolumeSet).build(); + } + if (!failedVolumeSet.isEmpty() && !fullVolumeSet.isEmpty()) { + return StorageReportResult.ReportResultBuilder.newBuilder() + .setStatus(ReportStatus.FAILED_AND_OUT_OF_SPACE_STORAGE) + .setFailedVolumeSet(failedVolumeSet).setFullVolumeSet(fullVolumeSet) + .build(); + } + return StorageReportResult.ReportResultBuilder.newBuilder() + .setStatus(ReportStatus.ALL_IS_WELL).build(); } private boolean isThresholdReached(UtilizationThreshold threshold, - SCMNodeStat stat) { + double scmUsedratio) { switch (threshold) { case NORMAL: - return stat.getScmUsedratio() < warningUtilizationThreshold; + return scmUsedratio < warningUtilizationThreshold; case WARN: - return stat.getScmUsedratio() >= warningUtilizationThreshold && - stat.getScmUsedratio() < criticalUtilizationThreshold; + return scmUsedratio >= warningUtilizationThreshold + && scmUsedratio < criticalUtilizationThreshold; case CRITICAL: - return stat.getScmUsedratio() >= criticalUtilizationThreshold; + return scmUsedratio >= criticalUtilizationThreshold; default: throw new RuntimeException("Unknown UtilizationThreshold value"); } @@ -211,67 +242,120 @@ public class SCMNodeStorageStatMap implements SCMNodeStorageStatMXBean { @Override public long getCapacity(UUID dnId) { - return scmNodeStorageStatMap.get(dnId).getCapacity().get(); + long capacity = 0; + Set reportSet = scmNodeStorageReportMap.get(dnId); + for (StorageLocationReport report : reportSet) { + capacity += report.getCapacity(); + } + return capacity; } @Override public long getRemainingSpace(UUID dnId) { - return scmNodeStorageStatMap.get(dnId).getRemaining().get(); + long remaining = 0; + Set reportSet = scmNodeStorageReportMap.get(dnId); + for (StorageLocationReport report : reportSet) { + remaining += report.getRemaining(); + } + return remaining; } @Override public long getUsedSpace(UUID dnId) { - return scmNodeStorageStatMap.get(dnId).getScmUsed().get(); + long scmUsed = 0; + Set reportSet = scmNodeStorageReportMap.get(dnId); + for (StorageLocationReport report : reportSet) { + scmUsed += report.getScmUsed(); + } + return scmUsed; } @Override public long getTotalCapacity() { - return clusterStat.getCapacity().get(); + long capacity = 0; + Set dnIdSet = scmNodeStorageReportMap.keySet(); + for (UUID id : dnIdSet) { + capacity += getCapacity(id); + } + return capacity; } @Override public long getTotalSpaceUsed() { - return clusterStat.getScmUsed().get(); + long scmUsed = 0; + Set dnIdSet = scmNodeStorageReportMap.keySet(); + for (UUID id : dnIdSet) { + scmUsed += getUsedSpace(id); + } + return scmUsed; } @Override public long getTotalFreeSpace() { - return clusterStat.getRemaining().get(); + long remaining = 0; + Set dnIdSet = scmNodeStorageReportMap.keySet(); + for (UUID id : dnIdSet) { + remaining += getRemainingSpace(id); + } + return remaining; } /** - * removes the dataNode from scmNodeStorageStatMap + * removes the dataNode from scmNodeStorageReportMap * @param datanodeID * @throws SCMException in case the dataNode is not found in the map. */ public void removeDatanode(UUID datanodeID) throws SCMException { Preconditions.checkNotNull(datanodeID); - synchronized (scmNodeStorageStatMap) { - if (!scmNodeStorageStatMap.containsKey(datanodeID)) { + synchronized (scmNodeStorageReportMap) { + if (!scmNodeStorageReportMap.containsKey(datanodeID)) { throw new SCMException("No such datanode", NO_SUCH_DATANODE); } - SCMNodeStat stat = scmNodeStorageStatMap.remove(datanodeID); - clusterStat.subtract(stat); + scmNodeStorageReportMap.remove(datanodeID); } } /** - * Gets the SCMNodeStat for the datanode + * Returns the set of storage volumes for a Datanode. * @param datanodeID - * @return SCMNodeStat + * @return set of storage volumes. */ - SCMNodeStat getNodeStat(UUID datanodeID) { - return scmNodeStorageStatMap.get(datanodeID); + @Override + public Set getStorageVolumes(UUID datanodeID) { + return scmNodeStorageReportMap.get(datanodeID); } + + /** + * Truncate to 4 digits since uncontrolled precision is some times + * counter intuitive to what users expect. + * @param value - double. + * @return double. + */ + private double truncateDecimals(double value) { + final int multiplier = 10000; + return (double) ((long) (value * multiplier)) / multiplier; + } + + /** + * get the scmUsed ratio + */ + public double getScmUsedratio(long scmUsed, long capacity) { + double scmUsedRatio = + truncateDecimals (scmUsed / (double) capacity); + return scmUsedRatio; + } /** * Results possible from processing a Node report by * Node2ContainerMapper. */ - public enum NodeReportStatus { + public enum ReportStatus { ALL_IS_WELL, - DATANODE_OUT_OF_SPACE + DATANODE_OUT_OF_SPACE, + STORAGE_OUT_OF_SPACE, + FAILED_STORAGE, + FAILED_AND_OUT_OF_SPACE_STORAGE } -} +} \ No newline at end of file diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StorageReportResult.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StorageReportResult.java new file mode 100644 index 00000000000..3436e7794d1 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StorageReportResult.java @@ -0,0 +1,87 @@ + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.hadoop.hdds.scm.node; + +import org.apache.hadoop.ozone.container.common.impl.StorageLocationReport; + +import java.util.Set; + +/** + * A Container Report gets processsed by the Node2Container and returns the + * Report Result class. + */ +public class StorageReportResult { + private SCMNodeStorageStatMap.ReportStatus status; + private Set fullVolumes; + private Set failedVolumes; + + StorageReportResult(SCMNodeStorageStatMap.ReportStatus status, + Set fullVolumes, + Set failedVolumes) { + this.status = status; + this.fullVolumes = fullVolumes; + this.failedVolumes = failedVolumes; + } + + public SCMNodeStorageStatMap.ReportStatus getStatus() { + return status; + } + + public Set getFullVolumes() { + return fullVolumes; + } + + public Set getFailedVolumes() { + return failedVolumes; + } + + static class ReportResultBuilder { + private SCMNodeStorageStatMap.ReportStatus status; + private Set fullVolumes; + private Set failedVolumes; + + static ReportResultBuilder newBuilder() { + return new ReportResultBuilder(); + } + + public ReportResultBuilder setStatus( + SCMNodeStorageStatMap.ReportStatus newstatus) { + this.status = newstatus; + return this; + } + + public ReportResultBuilder setFullVolumeSet( + Set fullVolumes) { + this.fullVolumes = fullVolumes; + return this; + } + + public ReportResultBuilder setFailedVolumeSet( + Set failedVolumes) { + this.failedVolumes = failedVolumes; + return this; + } + + StorageReportResult build() { + return new StorageReportResult(status, fullVolumes, failedVolumes); + } + } +} \ No newline at end of file diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeStorageStatMap.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeStorageStatMap.java index 2fa786b5464..571de77986a 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeStorageStatMap.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeStorageStatMap.java @@ -17,38 +17,56 @@ */ package org.apache.hadoop.hdds.scm.node; +import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; +import org.apache.hadoop.hdds.protocol.proto. + StorageContainerDatanodeProtocolProtos.SCMNodeReport; +import org.apache.hadoop.hdds.protocol.proto. + StorageContainerDatanodeProtocolProtos.SCMStorageReport; import org.apache.hadoop.hdds.scm.TestUtils; -import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; import org.apache.hadoop.hdds.scm.exceptions.SCMException; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMStorageReport; import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.container.common.impl.StorageLocationReport; import org.apache.hadoop.test.GenericTestUtils; import org.junit.*; +import org.junit.Rule; import org.junit.rules.ExpectedException; import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.Set; +import java.util.ArrayList; +import java.util.HashSet; +import java.io.IOException; import java.util.concurrent.ConcurrentHashMap; public class TestSCMNodeStorageStatMap { - private final static int DATANODE_COUNT = 300; + private final static int DATANODE_COUNT = 100; final long capacity = 10L * OzoneConsts.GB; final long used = 2L * OzoneConsts.GB; final long remaining = capacity - used; private static OzoneConfiguration conf = new OzoneConfiguration(); - private final Map testData = new ConcurrentHashMap<>(); + private final Map> testData = + new ConcurrentHashMap<>(); @Rule public ExpectedException thrown = ExpectedException.none(); private void generateData() { - SCMNodeStat stat = new SCMNodeStat(); - stat.set(capacity, used, remaining); for (int dnIndex = 1; dnIndex <= DATANODE_COUNT; dnIndex++) { - testData.put(UUID.randomUUID(), stat); + UUID dnId = UUID.randomUUID(); + Set reportSet = new HashSet<>(); + String path = GenericTestUtils.getTempPath( + TestSCMNodeStorageStatMap.class.getSimpleName() + "-" + Integer + .toString(dnIndex)); + StorageLocationReport.Builder builder = StorageLocationReport.newBuilder(); + builder.setStorageType(StorageType.DISK).setId(dnId.toString()) + .setStorageLocation(path).setScmUsed(used).setRemaining(remaining) + .setCapacity(capacity).setFailed(false); + reportSet.add(builder.build()); + testData.put(UUID.randomUUID(), reportSet); } } @@ -70,8 +88,8 @@ public class TestSCMNodeStorageStatMap { SCMNodeStorageStatMap map = new SCMNodeStorageStatMap(conf); UUID knownNode = getFirstKey(); UUID unknownNode = UUID.randomUUID(); - SCMNodeStat stat = testData.get(knownNode); - map.insertNewDatanode(knownNode, stat); + Set report = testData.get(knownNode); + map.insertNewDatanode(knownNode, report); Assert.assertTrue("Not able to detect a known node", map.isKnownDatanode(knownNode)); Assert.assertFalse("Unknown node detected", @@ -82,54 +100,89 @@ public class TestSCMNodeStorageStatMap { public void testInsertNewDatanode() throws SCMException { SCMNodeStorageStatMap map = new SCMNodeStorageStatMap(conf); UUID knownNode = getFirstKey(); - SCMNodeStat stat = testData.get(knownNode); - map.insertNewDatanode(knownNode, stat); - Assert.assertEquals(map.getNodeStat(knownNode).getScmUsed(), - testData.get(knownNode).getScmUsed()); + Set report = testData.get(knownNode); + map.insertNewDatanode(knownNode, report); + Assert.assertEquals(map.getStorageVolumes(knownNode), + testData.get(knownNode)); thrown.expect(SCMException.class); thrown.expectMessage("already exists"); - map.insertNewDatanode(knownNode, stat); + map.insertNewDatanode(knownNode, report); } @Test public void testUpdateUnknownDatanode() throws SCMException { SCMNodeStorageStatMap map = new SCMNodeStorageStatMap(conf); UUID unknownNode = UUID.randomUUID(); - SCMNodeStat stat = new SCMNodeStat(); - + String path = GenericTestUtils.getTempPath( + TestSCMNodeStorageStatMap.class.getSimpleName() + "-" + unknownNode + .toString()); + Set reportSet = new HashSet<>(); + StorageLocationReport.Builder builder = StorageLocationReport.newBuilder(); + builder.setStorageType(StorageType.DISK).setId(unknownNode.toString()) + .setStorageLocation(path).setScmUsed(used).setRemaining(remaining) + .setCapacity(capacity).setFailed(false); + reportSet.add(builder.build()); thrown.expect(SCMException.class); thrown.expectMessage("No such datanode"); - map.updateDatanodeMap(unknownNode, stat); + map.updateDatanodeMap(unknownNode, reportSet); } @Test - public void testProcessNodeReportCheckOneNode() throws SCMException { + public void testProcessNodeReportCheckOneNode() throws IOException { UUID key = getFirstKey(); - SCMNodeStat value = testData.get(key); + List reportList = new ArrayList<>(); + Set reportSet = testData.get(key); SCMNodeStorageStatMap map = new SCMNodeStorageStatMap(conf); - map.insertNewDatanode(key, value); + map.insertNewDatanode(key, reportSet); Assert.assertTrue(map.isKnownDatanode(key)); String storageId = UUID.randomUUID().toString(); String path = GenericTestUtils.getRandomizedTempPath().concat("/" + storageId); - long capacity = value.getCapacity().get(); - long used = value.getScmUsed().get(); - long remaining = value.getRemaining().get(); + StorageLocationReport report = reportSet.iterator().next(); + long capacity = report.getCapacity(); + long used = report.getScmUsed(); + long remaining = report.getRemaining(); List reports = TestUtils .createStorageReport(capacity, used, remaining, path, null, storageId, 1); - SCMNodeStorageStatMap.NodeReportStatus status = + StorageReportResult result = map.processNodeReport(key, TestUtils.createNodeReport(reports)); - Assert.assertEquals(status, - SCMNodeStorageStatMap.NodeReportStatus.ALL_IS_WELL); + Assert.assertEquals(result.getStatus(), + SCMNodeStorageStatMap.ReportStatus.ALL_IS_WELL); + StorageContainerDatanodeProtocolProtos.SCMNodeReport.Builder nrb = + SCMNodeReport.newBuilder(); + SCMStorageReport srb = reportSet.iterator().next().getProtoBufMessage(); + reportList.add(srb); + result = map.processNodeReport(key, TestUtils.createNodeReport(reportList)); + Assert.assertEquals(result.getStatus(), + SCMNodeStorageStatMap.ReportStatus.ALL_IS_WELL); + + reportList.add(TestUtils + .createStorageReport(capacity, capacity, 0, path, null, + UUID.randomUUID().toString(), 1).get(0)); + result = map.processNodeReport(key, TestUtils.createNodeReport(reportList)); + Assert.assertEquals(result.getStatus(), + SCMNodeStorageStatMap.ReportStatus.STORAGE_OUT_OF_SPACE); + // Mark a disk failed + SCMStorageReport srb2 = SCMStorageReport.newBuilder() + .setStorageUuid(UUID.randomUUID().toString()) + .setStorageLocation(srb.getStorageLocation()).setScmUsed(capacity) + .setCapacity(capacity).setRemaining(0).setFailed(true).build(); + reportList.add(srb2); + nrb.addAllStorageReport(reportList); + result = map.processNodeReport(key, nrb.addStorageReport(srb).build()); + Assert.assertEquals(result.getStatus(), + SCMNodeStorageStatMap.ReportStatus.FAILED_AND_OUT_OF_SPACE_STORAGE); + } @Test - public void testProcessNodeReportAndSCMStats() throws SCMException { + public void testProcessMultipleNodeReports() throws SCMException { SCMNodeStorageStatMap map = new SCMNodeStorageStatMap(conf); int counter = 1; // Insert all testData into the SCMNodeStorageStatMap Map. - for (Map.Entry keyEntry : testData.entrySet()) { + for (Map.Entry> keyEntry : testData + .entrySet()) { map.insertNewDatanode(keyEntry.getKey(), keyEntry.getValue()); } Assert.assertEquals(DATANODE_COUNT * capacity, map.getTotalCapacity()); @@ -137,9 +190,21 @@ public class TestSCMNodeStorageStatMap { Assert.assertEquals(DATANODE_COUNT * used, map.getTotalSpaceUsed()); // upadate 1/4th of the datanode to be full - for (Map.Entry keyEntry : testData.entrySet()) { - SCMNodeStat stat = new SCMNodeStat(capacity, capacity, 0); - map.updateDatanodeMap(keyEntry.getKey(), stat); + for (Map.Entry> keyEntry : testData + .entrySet()) { + Set reportSet = new HashSet<>(); + String path = GenericTestUtils.getTempPath( + TestSCMNodeStorageStatMap.class.getSimpleName() + "-" + keyEntry + .getKey().toString()); + StorageLocationReport.Builder builder = + StorageLocationReport.newBuilder(); + builder.setStorageType(StorageType.DISK) + .setId(keyEntry.getKey().toString()).setStorageLocation(path) + .setScmUsed(capacity).setRemaining(0).setCapacity(capacity) + .setFailed(false); + reportSet.add(builder.build()); + + map.updateDatanodeMap(keyEntry.getKey(), reportSet); counter++; if (counter > DATANODE_COUNT / 4) { break; @@ -163,7 +228,8 @@ public class TestSCMNodeStorageStatMap { map.getTotalSpaceUsed(), 0); counter = 1; // Remove 1/4 of the DataNodes from the Map - for (Map.Entry keyEntry : testData.entrySet()) { + for (Map.Entry> keyEntry : testData + .entrySet()) { map.removeDatanode(keyEntry.getKey()); counter++; if (counter > DATANODE_COUNT / 4) { @@ -181,12 +247,13 @@ public class TestSCMNodeStorageStatMap { map.getDatanodeList(SCMNodeStorageStatMap.UtilizationThreshold.NORMAL) .size(), 0); - Assert.assertEquals(0.75 * DATANODE_COUNT * capacity, map.getTotalCapacity(), 0); + Assert + .assertEquals(0.75 * DATANODE_COUNT * capacity, map.getTotalCapacity(), + 0); Assert.assertEquals(0.75 * DATANODE_COUNT * remaining, map.getTotalFreeSpace(), 0); - Assert.assertEquals( - 0.75 * DATANODE_COUNT * used , - map.getTotalSpaceUsed(), 0); + Assert + .assertEquals(0.75 * DATANODE_COUNT * used, map.getTotalSpaceUsed(), 0); } }