HDFS-7390. Provide JMX metrics per storage type. (Benoy Antony)
(cherry picked from commit d3fed8e653
)
This commit is contained in:
parent
438bb25d43
commit
e40e9fc7f4
|
@ -38,6 +38,8 @@ import java.util.TreeSet;
|
||||||
import java.util.concurrent.ThreadLocalRandom;
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
|
import javax.management.ObjectName;
|
||||||
|
|
||||||
import org.apache.hadoop.HadoopIllegalArgumentException;
|
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -83,6 +85,7 @@ import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
|
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
|
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
|
||||||
import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
|
import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
|
||||||
|
import org.apache.hadoop.metrics2.util.MBeans;
|
||||||
import org.apache.hadoop.net.Node;
|
import org.apache.hadoop.net.Node;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.util.Daemon;
|
import org.apache.hadoop.util.Daemon;
|
||||||
|
@ -92,6 +95,7 @@ import org.apache.hadoop.util.Time;
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import com.google.common.collect.Sets;
|
import com.google.common.collect.Sets;
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -99,7 +103,7 @@ import org.slf4j.LoggerFactory;
|
||||||
* Keeps information related to the blocks stored in the Hadoop cluster.
|
* Keeps information related to the blocks stored in the Hadoop cluster.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class BlockManager {
|
public class BlockManager implements BlockStatsMXBean {
|
||||||
|
|
||||||
public static final Logger LOG = LoggerFactory.getLogger(BlockManager.class);
|
public static final Logger LOG = LoggerFactory.getLogger(BlockManager.class);
|
||||||
public static final Logger blockLog = NameNode.blockStateChangeLog;
|
public static final Logger blockLog = NameNode.blockStateChangeLog;
|
||||||
|
@ -127,6 +131,7 @@ public class BlockManager {
|
||||||
private final AtomicLong postponedMisreplicatedBlocksCount = new AtomicLong(0L);
|
private final AtomicLong postponedMisreplicatedBlocksCount = new AtomicLong(0L);
|
||||||
private final long startupDelayBlockDeletionInMs;
|
private final long startupDelayBlockDeletionInMs;
|
||||||
private final BlockReportLeaseManager blockReportLeaseManager;
|
private final BlockReportLeaseManager blockReportLeaseManager;
|
||||||
|
private ObjectName mxBeanName;
|
||||||
|
|
||||||
/** Used by metrics */
|
/** Used by metrics */
|
||||||
public long getPendingReplicationBlocksCount() {
|
public long getPendingReplicationBlocksCount() {
|
||||||
|
@ -459,6 +464,7 @@ public class BlockManager {
|
||||||
pendingReplications.start();
|
pendingReplications.start();
|
||||||
datanodeManager.activate(conf);
|
datanodeManager.activate(conf);
|
||||||
this.replicationThread.start();
|
this.replicationThread.start();
|
||||||
|
mxBeanName = MBeans.register("NameNode", "BlockStats", this);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void close() {
|
public void close() {
|
||||||
|
@ -3932,6 +3938,8 @@ public class BlockManager {
|
||||||
public void shutdown() {
|
public void shutdown() {
|
||||||
stopReplicationInitializer();
|
stopReplicationInitializer();
|
||||||
blocksMap.close();
|
blocksMap.close();
|
||||||
|
MBeans.unregister(mxBeanName);
|
||||||
|
mxBeanName = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void clear() {
|
public void clear() {
|
||||||
|
@ -3942,4 +3950,9 @@ public class BlockManager {
|
||||||
public BlockReportLeaseManager getBlockReportLeaseManager() {
|
public BlockReportLeaseManager getBlockReportLeaseManager() {
|
||||||
return blockReportLeaseManager;
|
return blockReportLeaseManager;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override // BlockStatsMXBean
|
||||||
|
public Map<StorageType, StorageTypeStats> getStorageTypeStats() {
|
||||||
|
return datanodeManager.getDatanodeStatistics().getStorageTypeStats();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,36 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.StorageType;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This is an interface used to retrieve statistic information related to
|
||||||
|
* block management.
|
||||||
|
*/
|
||||||
|
public interface BlockStatsMXBean {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The statistics of storage types.
|
||||||
|
*
|
||||||
|
* @return get storage statistics per storage type
|
||||||
|
*/
|
||||||
|
Map<StorageType, StorageTypeStats> getStorageTypeStats();
|
||||||
|
}
|
|
@ -17,6 +17,9 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.blockmanagement;
|
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.StorageType;
|
||||||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||||
|
|
||||||
/** Datanode statistics */
|
/** Datanode statistics */
|
||||||
|
@ -71,4 +74,7 @@ public interface DatanodeStatistics {
|
||||||
|
|
||||||
/** @return the expired heartbeats */
|
/** @return the expired heartbeats */
|
||||||
public int getExpiredHeartbeats();
|
public int getExpiredHeartbeats();
|
||||||
|
|
||||||
|
/** @return Storage Tier statistics*/
|
||||||
|
Map<StorageType, StorageTypeStats> getStorageTypeStats();
|
||||||
}
|
}
|
|
@ -18,9 +18,15 @@
|
||||||
package org.apache.hadoop.hdfs.server.blockmanagement;
|
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.IdentityHashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.StorageType;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DFSUtilClient;
|
import org.apache.hadoop.hdfs.DFSUtilClient;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
|
@ -189,6 +195,11 @@ class HeartbeatManager implements DatanodeStatistics {
|
||||||
return stats.expiredHeartbeats;
|
return stats.expiredHeartbeats;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<StorageType, StorageTypeStats> getStorageTypeStats() {
|
||||||
|
return stats.statsMap.get();
|
||||||
|
}
|
||||||
|
|
||||||
synchronized void register(final DatanodeDescriptor d) {
|
synchronized void register(final DatanodeDescriptor d) {
|
||||||
if (!d.isAlive) {
|
if (!d.isAlive) {
|
||||||
addDatanode(d);
|
addDatanode(d);
|
||||||
|
@ -393,6 +404,9 @@ class HeartbeatManager implements DatanodeStatistics {
|
||||||
* For decommissioning/decommissioned nodes, only used capacity is counted.
|
* For decommissioning/decommissioned nodes, only used capacity is counted.
|
||||||
*/
|
*/
|
||||||
private static class Stats {
|
private static class Stats {
|
||||||
|
|
||||||
|
private final StorageTypeStatsMap statsMap = new StorageTypeStatsMap();
|
||||||
|
|
||||||
private long capacityTotal = 0L;
|
private long capacityTotal = 0L;
|
||||||
private long capacityUsed = 0L;
|
private long capacityUsed = 0L;
|
||||||
private long capacityRemaining = 0L;
|
private long capacityRemaining = 0L;
|
||||||
|
@ -420,6 +434,14 @@ class HeartbeatManager implements DatanodeStatistics {
|
||||||
}
|
}
|
||||||
cacheCapacity += node.getCacheCapacity();
|
cacheCapacity += node.getCacheCapacity();
|
||||||
cacheUsed += node.getCacheUsed();
|
cacheUsed += node.getCacheUsed();
|
||||||
|
Set<StorageType> storageTypes = new HashSet<>();
|
||||||
|
for (DatanodeStorageInfo storageInfo : node.getStorageInfos()) {
|
||||||
|
statsMap.addStorage(storageInfo, node);
|
||||||
|
storageTypes.add(storageInfo.getStorageType());
|
||||||
|
}
|
||||||
|
for (StorageType storageType : storageTypes) {
|
||||||
|
statsMap.addNode(storageType, node);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void subtract(final DatanodeDescriptor node) {
|
private void subtract(final DatanodeDescriptor node) {
|
||||||
|
@ -436,6 +458,14 @@ class HeartbeatManager implements DatanodeStatistics {
|
||||||
}
|
}
|
||||||
cacheCapacity -= node.getCacheCapacity();
|
cacheCapacity -= node.getCacheCapacity();
|
||||||
cacheUsed -= node.getCacheUsed();
|
cacheUsed -= node.getCacheUsed();
|
||||||
|
Set<StorageType> storageTypes = new HashSet<>();
|
||||||
|
for (DatanodeStorageInfo storageInfo : node.getStorageInfos()) {
|
||||||
|
statsMap.subtractStorage(storageInfo, node);
|
||||||
|
storageTypes.add(storageInfo.getStorageType());
|
||||||
|
}
|
||||||
|
for (StorageType storageType : storageTypes) {
|
||||||
|
statsMap.subtractNode(storageType, node);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Increment expired heartbeat counter. */
|
/** Increment expired heartbeat counter. */
|
||||||
|
@ -443,5 +473,69 @@ class HeartbeatManager implements DatanodeStatistics {
|
||||||
expiredHeartbeats++;
|
expiredHeartbeats++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
|
/** StorageType specific statistics.
|
||||||
|
* For decommissioning/decommissioned nodes, only used capacity is counted.
|
||||||
|
*/
|
||||||
|
|
||||||
|
static final class StorageTypeStatsMap {
|
||||||
|
|
||||||
|
private Map<StorageType, StorageTypeStats> storageTypeStatsMap =
|
||||||
|
new IdentityHashMap<>();
|
||||||
|
|
||||||
|
private StorageTypeStatsMap() {}
|
||||||
|
|
||||||
|
private StorageTypeStatsMap(StorageTypeStatsMap other) {
|
||||||
|
storageTypeStatsMap =
|
||||||
|
new IdentityHashMap<>(other.storageTypeStatsMap);
|
||||||
|
for (Map.Entry<StorageType, StorageTypeStats> entry :
|
||||||
|
storageTypeStatsMap.entrySet()) {
|
||||||
|
entry.setValue(new StorageTypeStats(entry.getValue()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Map<StorageType, StorageTypeStats> get() {
|
||||||
|
return Collections.unmodifiableMap(storageTypeStatsMap);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void addNode(StorageType storageType,
|
||||||
|
final DatanodeDescriptor node) {
|
||||||
|
StorageTypeStats storageTypeStats =
|
||||||
|
storageTypeStatsMap.get(storageType);
|
||||||
|
if (storageTypeStats == null) {
|
||||||
|
storageTypeStats = new StorageTypeStats();
|
||||||
|
storageTypeStatsMap.put(storageType, storageTypeStats);
|
||||||
|
}
|
||||||
|
storageTypeStats.addNode(node);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void addStorage(final DatanodeStorageInfo info,
|
||||||
|
final DatanodeDescriptor node) {
|
||||||
|
StorageTypeStats storageTypeStats =
|
||||||
|
storageTypeStatsMap.get(info.getStorageType());
|
||||||
|
if (storageTypeStats == null) {
|
||||||
|
storageTypeStats = new StorageTypeStats();
|
||||||
|
storageTypeStatsMap.put(info.getStorageType(), storageTypeStats);
|
||||||
|
}
|
||||||
|
storageTypeStats.addStorage(info, node);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void subtractStorage(final DatanodeStorageInfo info,
|
||||||
|
final DatanodeDescriptor node) {
|
||||||
|
StorageTypeStats storageTypeStats =
|
||||||
|
storageTypeStatsMap.get(info.getStorageType());
|
||||||
|
if (storageTypeStats != null) {
|
||||||
|
storageTypeStats.subtractStorage(info, node);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void subtractNode(StorageType storageType,
|
||||||
|
final DatanodeDescriptor node) {
|
||||||
|
StorageTypeStats storageTypeStats =
|
||||||
|
storageTypeStatsMap.get(storageType);
|
||||||
|
if (storageTypeStats != null) {
|
||||||
|
storageTypeStats.subtractNode(node);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,115 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||||
|
|
||||||
|
import java.beans.ConstructorProperties;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Statistics per StorageType.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Evolving
|
||||||
|
public class StorageTypeStats {
|
||||||
|
private long capacityTotal = 0L;
|
||||||
|
private long capacityUsed = 0L;
|
||||||
|
private long capacityRemaining = 0L;
|
||||||
|
private long blockPoolUsed = 0L;
|
||||||
|
private int nodesInService = 0;
|
||||||
|
|
||||||
|
@ConstructorProperties({"capacityTotal",
|
||||||
|
"capacityUsed", "capacityRemaining", "blockPoolUsed", "nodesInService"})
|
||||||
|
public StorageTypeStats(long capacityTotal, long capacityUsed,
|
||||||
|
long capacityRemaining, long blockPoolUsed, int nodesInService) {
|
||||||
|
this.capacityTotal = capacityTotal;
|
||||||
|
this.capacityUsed = capacityUsed;
|
||||||
|
this.capacityRemaining = capacityRemaining;
|
||||||
|
this.blockPoolUsed = blockPoolUsed;
|
||||||
|
this.nodesInService = nodesInService;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getCapacityTotal() {
|
||||||
|
return capacityTotal;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getCapacityUsed() {
|
||||||
|
return capacityUsed;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getCapacityRemaining() {
|
||||||
|
return capacityRemaining;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getBlockPoolUsed() {
|
||||||
|
return blockPoolUsed;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getNodesInService() {
|
||||||
|
return nodesInService;
|
||||||
|
}
|
||||||
|
|
||||||
|
StorageTypeStats() {}
|
||||||
|
|
||||||
|
StorageTypeStats(StorageTypeStats other) {
|
||||||
|
capacityTotal = other.capacityTotal;
|
||||||
|
capacityUsed = other.capacityUsed;
|
||||||
|
capacityRemaining = other.capacityRemaining;
|
||||||
|
blockPoolUsed = other.blockPoolUsed;
|
||||||
|
nodesInService = other.nodesInService;
|
||||||
|
}
|
||||||
|
|
||||||
|
void addStorage(final DatanodeStorageInfo info,
|
||||||
|
final DatanodeDescriptor node) {
|
||||||
|
capacityUsed += info.getDfsUsed();
|
||||||
|
blockPoolUsed += info.getBlockPoolUsed();
|
||||||
|
if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
|
||||||
|
capacityTotal += info.getCapacity();
|
||||||
|
capacityRemaining += info.getRemaining();
|
||||||
|
} else {
|
||||||
|
capacityTotal += info.getDfsUsed();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void addNode(final DatanodeDescriptor node) {
|
||||||
|
if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
|
||||||
|
nodesInService++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void subtractStorage(final DatanodeStorageInfo info,
|
||||||
|
final DatanodeDescriptor node) {
|
||||||
|
capacityUsed -= info.getDfsUsed();
|
||||||
|
blockPoolUsed -= info.getBlockPoolUsed();
|
||||||
|
if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
|
||||||
|
capacityTotal -= info.getCapacity();
|
||||||
|
capacityRemaining -= info.getRemaining();
|
||||||
|
} else {
|
||||||
|
capacityTotal -= info.getDfsUsed();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void subtractNode(final DatanodeDescriptor node) {
|
||||||
|
if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
|
||||||
|
nodesInService--;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,146 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.net.URL;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.StorageType;
|
||||||
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.mortbay.util.ajax.JSON;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Class for testing {@link BlockStatsMXBean} implementation
|
||||||
|
*/
|
||||||
|
public class TestBlockStatsMXBean {
|
||||||
|
|
||||||
|
private MiniDFSCluster cluster;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() throws IOException {
|
||||||
|
HdfsConfiguration conf = new HdfsConfiguration();
|
||||||
|
cluster = null;
|
||||||
|
StorageType[][] types = new StorageType[6][];
|
||||||
|
for (int i=0; i<3; i++) {
|
||||||
|
types[i] = new StorageType[] {StorageType.RAM_DISK, StorageType.DISK};
|
||||||
|
}
|
||||||
|
for (int i=3; i< 5; i++) {
|
||||||
|
types[i] = new StorageType[] {StorageType.RAM_DISK, StorageType.ARCHIVE};
|
||||||
|
}
|
||||||
|
types[5] = new StorageType[] {StorageType.RAM_DISK, StorageType.ARCHIVE,
|
||||||
|
StorageType.ARCHIVE};
|
||||||
|
|
||||||
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(6).
|
||||||
|
storageTypes(types).storagesPerDatanode(3).build();
|
||||||
|
cluster.waitActive();
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() {
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testStorageTypeStats() throws Exception {
|
||||||
|
Map<StorageType, StorageTypeStats> storageTypeStatsMap =
|
||||||
|
cluster.getNamesystem().getBlockManager().getStorageTypeStats();
|
||||||
|
assertTrue(storageTypeStatsMap.containsKey(StorageType.RAM_DISK));
|
||||||
|
assertTrue(storageTypeStatsMap.containsKey(StorageType.DISK));
|
||||||
|
assertTrue(storageTypeStatsMap.containsKey(StorageType.ARCHIVE));
|
||||||
|
|
||||||
|
StorageTypeStats storageTypeStats =
|
||||||
|
storageTypeStatsMap.get(StorageType.RAM_DISK);
|
||||||
|
assertEquals(6, storageTypeStats.getNodesInService());
|
||||||
|
|
||||||
|
storageTypeStats = storageTypeStatsMap.get(StorageType.DISK);
|
||||||
|
assertEquals(3, storageTypeStats.getNodesInService());
|
||||||
|
|
||||||
|
storageTypeStats = storageTypeStatsMap.get(StorageType.ARCHIVE);
|
||||||
|
assertEquals(3, storageTypeStats.getNodesInService());
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static String readOutput(URL url) throws IOException {
|
||||||
|
StringBuilder out = new StringBuilder();
|
||||||
|
InputStream in = url.openConnection().getInputStream();
|
||||||
|
byte[] buffer = new byte[64 * 1024];
|
||||||
|
int len = in.read(buffer);
|
||||||
|
while (len > 0) {
|
||||||
|
out.append(new String(buffer, 0, len));
|
||||||
|
len = in.read(buffer);
|
||||||
|
}
|
||||||
|
return out.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public void testStorageTypeStatsJMX() throws Exception {
|
||||||
|
URL baseUrl = new URL (cluster.getHttpUri(0));
|
||||||
|
String result = readOutput(new URL(baseUrl, "/jmx"));
|
||||||
|
System.out.println(result);
|
||||||
|
|
||||||
|
Map<String, Object> stat = (Map<String, Object>) JSON.parse(result);
|
||||||
|
Object[] beans =(Object[]) stat.get("beans");
|
||||||
|
Map<String, Object> blockStats = null;
|
||||||
|
for (Object bean : beans) {
|
||||||
|
Map<String, Object> map = (Map<String, Object>) bean;
|
||||||
|
if (map.get("name").equals("Hadoop:service=NameNode,name=BlockStats")) {
|
||||||
|
blockStats = map;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertNotNull(blockStats);
|
||||||
|
Object[] storageTypeStatsList =
|
||||||
|
(Object[])blockStats.get("StorageTypeStats");
|
||||||
|
assertNotNull(storageTypeStatsList);
|
||||||
|
assertEquals (3, storageTypeStatsList.length);
|
||||||
|
|
||||||
|
Set<String> typesPresent = new HashSet<> ();
|
||||||
|
for (Object obj : storageTypeStatsList) {
|
||||||
|
Map<String, Object> entry = (Map<String, Object>)obj;
|
||||||
|
String storageType = (String)entry.get("key");
|
||||||
|
Map<String,Object> storageTypeStats = (Map<String,Object>)entry.get("value");
|
||||||
|
typesPresent.add(storageType);
|
||||||
|
if (storageType.equals("ARCHIVE") || storageType.equals("DISK") ) {
|
||||||
|
assertEquals(3l, storageTypeStats.get("nodesInService"));
|
||||||
|
} else if (storageType.equals("RAM_DISK")) {
|
||||||
|
assertEquals(6l, storageTypeStats.get("nodesInService"));
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
fail();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
assertTrue(typesPresent.contains("ARCHIVE"));
|
||||||
|
assertTrue(typesPresent.contains("DISK"));
|
||||||
|
assertTrue(typesPresent.contains("RAM_DISK"));
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue