HDFS-12775. [READ] Fix reporting of Provided volumes
This commit is contained in:
parent
e1a28f95b8
commit
3b1d30301b
|
@ -331,7 +331,6 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||
public static final String DFS_NAMENODE_PROVIDED_ENABLED = "dfs.namenode.provided.enabled";
|
||||
public static final boolean DFS_NAMENODE_PROVIDED_ENABLED_DEFAULT = false;
|
||||
|
||||
public static final String DFS_PROVIDER_DF_CLASS = "dfs.provided.df.class";
|
||||
public static final String DFS_PROVIDER_STORAGEUUID = "dfs.provided.storage.id";
|
||||
public static final String DFS_PROVIDER_STORAGEUUID_DEFAULT = "DS-PROVIDED";
|
||||
public static final String DFS_PROVIDED_ALIASMAP_CLASS = "dfs.provided.aliasmap.class";
|
||||
|
|
|
@ -103,6 +103,8 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
|
|||
import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
|
||||
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
|
||||
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
|
||||
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
||||
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
|
||||
import org.apache.hadoop.hdfs.util.FoldedTreeSet;
|
||||
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
||||
import org.apache.hadoop.hdfs.server.namenode.CacheManager;
|
||||
|
@ -2402,6 +2404,21 @@ public class BlockManager implements BlockStatsMXBean {
|
|||
}
|
||||
}
|
||||
|
||||
public long getProvidedCapacity() {
|
||||
return providedStorageMap.getCapacity();
|
||||
}
|
||||
|
||||
public void updateHeartbeat(DatanodeDescriptor node, StorageReport[] reports,
|
||||
long cacheCapacity, long cacheUsed, int xceiverCount, int failedVolumes,
|
||||
VolumeFailureSummary volumeFailureSummary) {
|
||||
|
||||
for (StorageReport report: reports) {
|
||||
providedStorageMap.updateStorage(node, report.getStorage());
|
||||
}
|
||||
node.updateHeartbeat(reports, cacheCapacity, cacheUsed, xceiverCount,
|
||||
failedVolumes, volumeFailureSummary);
|
||||
}
|
||||
|
||||
/**
|
||||
* StatefulBlockInfo is used to build the "toUC" list, which is a list of
|
||||
* updates to the information about under-construction blocks.
|
||||
|
@ -2463,7 +2480,7 @@ public class BlockManager implements BlockStatsMXBean {
|
|||
// !#! Register DN with provided storage, not with storage owned by DN
|
||||
// !#! DN should still have a ref to the DNStorageInfo
|
||||
DatanodeStorageInfo storageInfo =
|
||||
providedStorageMap.getStorage(node, storage, context);
|
||||
providedStorageMap.getStorage(node, storage);
|
||||
|
||||
if (storageInfo == null) {
|
||||
// We handle this for backwards compatibility.
|
||||
|
|
|
@ -449,24 +449,24 @@ public class DatanodeDescriptor extends DatanodeInfo {
|
|||
this.volumeFailures = volFailures;
|
||||
this.volumeFailureSummary = volumeFailureSummary;
|
||||
for (StorageReport report : reports) {
|
||||
totalCapacity += report.getCapacity();
|
||||
totalRemaining += report.getRemaining();
|
||||
totalBlockPoolUsed += report.getBlockPoolUsed();
|
||||
totalDfsUsed += report.getDfsUsed();
|
||||
totalNonDfsUsed += report.getNonDfsUsed();
|
||||
|
||||
// for PROVIDED storages, do not call updateStorage() unless
|
||||
// DatanodeStorageInfo already exists!
|
||||
if (StorageType.PROVIDED.equals(report.getStorage().getStorageType())
|
||||
&& storageMap.get(report.getStorage().getStorageID()) == null) {
|
||||
continue;
|
||||
}
|
||||
DatanodeStorageInfo storage = updateStorage(report.getStorage());
|
||||
DatanodeStorageInfo storage =
|
||||
storageMap.get(report.getStorage().getStorageID());
|
||||
if (checkFailedStorages) {
|
||||
failedStorageInfos.remove(storage);
|
||||
}
|
||||
|
||||
storage.receivedHeartbeat(report);
|
||||
// skip accounting for capacity of PROVIDED storages!
|
||||
if (StorageType.PROVIDED.equals(storage.getStorageType())) {
|
||||
continue;
|
||||
}
|
||||
|
||||
totalCapacity += report.getCapacity();
|
||||
totalRemaining += report.getRemaining();
|
||||
totalBlockPoolUsed += report.getBlockPoolUsed();
|
||||
totalDfsUsed += report.getDfsUsed();
|
||||
totalNonDfsUsed += report.getNonDfsUsed();
|
||||
}
|
||||
|
||||
// Update total metrics for the node.
|
||||
|
|
|
@ -77,4 +77,7 @@ public interface DatanodeStatistics {
|
|||
|
||||
/** @return Storage Tier statistics*/
|
||||
Map<StorageType, StorageTypeStats> getStorageTypeStats();
|
||||
|
||||
/** @return the provided capacity */
|
||||
public long getProvidedCapacity();
|
||||
}
|
|
@ -183,7 +183,7 @@ class DatanodeStats {
|
|||
StorageTypeStats storageTypeStats =
|
||||
storageTypeStatsMap.get(storageType);
|
||||
if (storageTypeStats == null) {
|
||||
storageTypeStats = new StorageTypeStats();
|
||||
storageTypeStats = new StorageTypeStats(storageType);
|
||||
storageTypeStatsMap.put(storageType, storageTypeStats);
|
||||
}
|
||||
storageTypeStats.addNode(node);
|
||||
|
@ -194,7 +194,7 @@ class DatanodeStats {
|
|||
StorageTypeStats storageTypeStats =
|
||||
storageTypeStatsMap.get(info.getStorageType());
|
||||
if (storageTypeStats == null) {
|
||||
storageTypeStats = new StorageTypeStats();
|
||||
storageTypeStats = new StorageTypeStats(info.getStorageType());
|
||||
storageTypeStatsMap.put(info.getStorageType(), storageTypeStats);
|
||||
}
|
||||
storageTypeStats.addStorage(info, node);
|
||||
|
|
|
@ -195,6 +195,11 @@ class HeartbeatManager implements DatanodeStatistics {
|
|||
return stats.getStatsMap();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getProvidedCapacity() {
|
||||
return blockManager.getProvidedCapacity();
|
||||
}
|
||||
|
||||
synchronized void register(final DatanodeDescriptor d) {
|
||||
if (!d.isAlive()) {
|
||||
addDatanode(d);
|
||||
|
@ -232,8 +237,8 @@ class HeartbeatManager implements DatanodeStatistics {
|
|||
int xceiverCount, int failedVolumes,
|
||||
VolumeFailureSummary volumeFailureSummary) {
|
||||
stats.subtract(node);
|
||||
node.updateHeartbeat(reports, cacheCapacity, cacheUsed,
|
||||
xceiverCount, failedVolumes, volumeFailureSummary);
|
||||
blockManager.updateHeartbeat(node, reports, cacheCapacity, cacheUsed,
|
||||
xceiverCount, failedVolumes, volumeFailureSummary);
|
||||
stats.add(node);
|
||||
}
|
||||
|
||||
|
|
|
@ -42,7 +42,6 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
|||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||
import org.apache.hadoop.hdfs.server.common.blockaliasmap.BlockAliasMap;
|
||||
import org.apache.hadoop.hdfs.server.common.blockaliasmap.impl.TextFileRegionAliasMap;
|
||||
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
|
||||
import org.apache.hadoop.hdfs.server.common.BlockAlias;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
|
||||
|
@ -72,6 +71,7 @@ public class ProvidedStorageMap {
|
|||
private final ProvidedDescriptor providedDescriptor;
|
||||
private final DatanodeStorageInfo providedStorageInfo;
|
||||
private boolean providedEnabled;
|
||||
private long capacity;
|
||||
|
||||
ProvidedStorageMap(RwLock lock, BlockManager bm, Configuration conf)
|
||||
throws IOException {
|
||||
|
@ -112,14 +112,13 @@ public class ProvidedStorageMap {
|
|||
/**
|
||||
* @param dn datanode descriptor
|
||||
* @param s data node storage
|
||||
* @param context the block report context
|
||||
* @return the {@link DatanodeStorageInfo} for the specified datanode.
|
||||
* If {@code s} corresponds to a provided storage, the storage info
|
||||
* representing provided storage is returned.
|
||||
* @throws IOException
|
||||
*/
|
||||
DatanodeStorageInfo getStorage(DatanodeDescriptor dn, DatanodeStorage s,
|
||||
BlockReportContext context) throws IOException {
|
||||
DatanodeStorageInfo getStorage(DatanodeDescriptor dn, DatanodeStorage s)
|
||||
throws IOException {
|
||||
if (providedEnabled && storageId.equals(s.getStorageID())) {
|
||||
if (StorageType.PROVIDED.equals(s.getStorageType())) {
|
||||
if (providedStorageInfo.getState() == State.FAILED
|
||||
|
@ -127,8 +126,10 @@ public class ProvidedStorageMap {
|
|||
providedStorageInfo.setState(State.NORMAL);
|
||||
LOG.info("Provided storage transitioning to state " + State.NORMAL);
|
||||
}
|
||||
processProvidedStorageReport(context);
|
||||
dn.injectStorage(providedStorageInfo);
|
||||
if (dn.getStorageInfo(s.getStorageID()) == null) {
|
||||
dn.injectStorage(providedStorageInfo);
|
||||
}
|
||||
processProvidedStorageReport();
|
||||
return providedDescriptor.getProvidedStorage(dn, s);
|
||||
}
|
||||
LOG.warn("Reserved storage {} reported as non-provided from {}", s, dn);
|
||||
|
@ -136,7 +137,7 @@ public class ProvidedStorageMap {
|
|||
return dn.getStorageInfo(s.getStorageID());
|
||||
}
|
||||
|
||||
private void processProvidedStorageReport(BlockReportContext context)
|
||||
private void processProvidedStorageReport()
|
||||
throws IOException {
|
||||
assert lock.hasWriteLock() : "Not holding write lock";
|
||||
if (providedStorageInfo.getBlockReportCount() == 0
|
||||
|
@ -172,6 +173,26 @@ public class ProvidedStorageMap {
|
|||
}
|
||||
}
|
||||
|
||||
public long getCapacity() {
|
||||
if (providedStorageInfo == null) {
|
||||
return 0;
|
||||
}
|
||||
return providedStorageInfo.getCapacity();
|
||||
}
|
||||
|
||||
public void updateStorage(DatanodeDescriptor node, DatanodeStorage storage) {
|
||||
if (providedEnabled && storageId.equals(storage.getStorageID())) {
|
||||
if (StorageType.PROVIDED.equals(storage.getStorageType())) {
|
||||
node.injectStorage(providedStorageInfo);
|
||||
return;
|
||||
} else {
|
||||
LOG.warn("Reserved storage {} reported as non-provided from {}",
|
||||
storage, node);
|
||||
}
|
||||
}
|
||||
node.updateStorage(storage);
|
||||
}
|
||||
|
||||
/**
|
||||
* Builder used for creating {@link LocatedBlocks} when a block is provided.
|
||||
*/
|
||||
|
@ -295,10 +316,12 @@ public class ProvidedStorageMap {
|
|||
* An abstract DatanodeDescriptor to track datanodes with provided storages.
|
||||
* NOTE: never resolved through registerDatanode, so not in the topology.
|
||||
*/
|
||||
static class ProvidedDescriptor extends DatanodeDescriptor {
|
||||
public static class ProvidedDescriptor extends DatanodeDescriptor {
|
||||
|
||||
private final NavigableMap<String, DatanodeDescriptor> dns =
|
||||
new ConcurrentSkipListMap<>();
|
||||
public final static String NETWORK_LOCATION = "/REMOTE";
|
||||
public final static String NAME = "PROVIDED";
|
||||
|
||||
ProvidedDescriptor() {
|
||||
super(new DatanodeID(
|
||||
|
@ -444,6 +467,21 @@ public class ProvidedStorageMap {
|
|||
public int hashCode() {
|
||||
return super.hashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "PROVIDED-LOCATION";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getNetworkLocation() {
|
||||
return NETWORK_LOCATION;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return NAME;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -480,7 +518,13 @@ public class ProvidedStorageMap {
|
|||
super.setState(state);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "PROVIDED-STORAGE";
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Used to emulate block reports for provided blocks.
|
||||
*/
|
||||
|
|
|
@ -22,6 +22,7 @@ import java.beans.ConstructorProperties;
|
|||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.fs.StorageType;
|
||||
|
||||
/**
|
||||
* Statistics per StorageType.
|
||||
|
@ -36,6 +37,7 @@ public class StorageTypeStats {
|
|||
private long capacityRemaining = 0L;
|
||||
private long blockPoolUsed = 0L;
|
||||
private int nodesInService = 0;
|
||||
private StorageType storageType;
|
||||
|
||||
@ConstructorProperties({"capacityTotal", "capacityUsed", "capacityNonDfsUsed",
|
||||
"capacityRemaining", "blockPoolUsed", "nodesInService"})
|
||||
|
@ -51,22 +53,47 @@ public class StorageTypeStats {
|
|||
}
|
||||
|
||||
public long getCapacityTotal() {
|
||||
// for PROVIDED storage, avoid counting the same storage
|
||||
// across multiple datanodes
|
||||
if (storageType == StorageType.PROVIDED && nodesInService > 0) {
|
||||
return capacityTotal/nodesInService;
|
||||
}
|
||||
return capacityTotal;
|
||||
}
|
||||
|
||||
public long getCapacityUsed() {
|
||||
// for PROVIDED storage, avoid counting the same storage
|
||||
// across multiple datanodes
|
||||
if (storageType == StorageType.PROVIDED && nodesInService > 0) {
|
||||
return capacityUsed/nodesInService;
|
||||
}
|
||||
return capacityUsed;
|
||||
}
|
||||
|
||||
public long getCapacityNonDfsUsed() {
|
||||
// for PROVIDED storage, avoid counting the same storage
|
||||
// across multiple datanodes
|
||||
if (storageType == StorageType.PROVIDED && nodesInService > 0) {
|
||||
return capacityNonDfsUsed/nodesInService;
|
||||
}
|
||||
return capacityNonDfsUsed;
|
||||
}
|
||||
|
||||
public long getCapacityRemaining() {
|
||||
// for PROVIDED storage, avoid counting the same storage
|
||||
// across multiple datanodes
|
||||
if (storageType == StorageType.PROVIDED && nodesInService > 0) {
|
||||
return capacityRemaining/nodesInService;
|
||||
}
|
||||
return capacityRemaining;
|
||||
}
|
||||
|
||||
public long getBlockPoolUsed() {
|
||||
// for PROVIDED storage, avoid counting the same storage
|
||||
// across multiple datanodes
|
||||
if (storageType == StorageType.PROVIDED && nodesInService > 0) {
|
||||
return blockPoolUsed/nodesInService;
|
||||
}
|
||||
return blockPoolUsed;
|
||||
}
|
||||
|
||||
|
@ -74,7 +101,9 @@ public class StorageTypeStats {
|
|||
return nodesInService;
|
||||
}
|
||||
|
||||
StorageTypeStats() {}
|
||||
StorageTypeStats(StorageType storageType) {
|
||||
this.storageType = storageType;
|
||||
}
|
||||
|
||||
StorageTypeStats(StorageTypeStats other) {
|
||||
capacityTotal = other.capacityTotal;
|
||||
|
@ -87,6 +116,7 @@ public class StorageTypeStats {
|
|||
|
||||
void addStorage(final DatanodeStorageInfo info,
|
||||
final DatanodeDescriptor node) {
|
||||
assert storageType == info.getStorageType();
|
||||
capacityUsed += info.getDfsUsed();
|
||||
capacityNonDfsUsed += info.getNonDfsUsed();
|
||||
blockPoolUsed += info.getBlockPoolUsed();
|
||||
|
@ -106,6 +136,7 @@ public class StorageTypeStats {
|
|||
|
||||
void subtractStorage(final DatanodeStorageInfo info,
|
||||
final DatanodeDescriptor node) {
|
||||
assert storageType == info.getStorageType();
|
||||
capacityUsed -= info.getDfsUsed();
|
||||
capacityNonDfsUsed -= info.getNonDfsUsed();
|
||||
blockPoolUsed -= info.getBlockPoolUsed();
|
||||
|
|
|
@ -1,58 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
||||
|
||||
import org.apache.hadoop.conf.Configurable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
||||
/**
|
||||
* The default usage statistics for a provided volume.
|
||||
*/
|
||||
public class DefaultProvidedVolumeDF
|
||||
implements ProvidedVolumeDF, Configurable {
|
||||
|
||||
@Override
|
||||
public void setConf(Configuration conf) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Configuration getConf() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getCapacity() {
|
||||
return Long.MAX_VALUE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getSpaceUsed() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getBlockPoolUsed(String bpid) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getAvailable() {
|
||||
return Long.MAX_VALUE;
|
||||
}
|
||||
}
|
|
@ -1,34 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
||||
|
||||
/**
|
||||
* This interface is used to define the usage statistics
|
||||
* of the provided storage.
|
||||
*/
|
||||
public interface ProvidedVolumeDF {
|
||||
|
||||
long getCapacity();
|
||||
|
||||
long getSpaceUsed();
|
||||
|
||||
long getBlockPoolUsed(String bpid);
|
||||
|
||||
long getAvailable();
|
||||
}
|
|
@ -26,6 +26,7 @@ import java.util.Map;
|
|||
import java.util.Set;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
|
@ -89,6 +90,30 @@ public class ProvidedVolumeImpl extends FsVolumeImpl {
|
|||
return suffix;
|
||||
}
|
||||
|
||||
/**
|
||||
* Class to keep track of the capacity usage statistics for provided volumes.
|
||||
*/
|
||||
public static class ProvidedVolumeDF {
|
||||
|
||||
private AtomicLong used = new AtomicLong();
|
||||
|
||||
public long getSpaceUsed() {
|
||||
return used.get();
|
||||
}
|
||||
|
||||
public void decDfsUsed(long value) {
|
||||
used.addAndGet(-value);
|
||||
}
|
||||
|
||||
public void incDfsUsed(long value) {
|
||||
used.addAndGet(value);
|
||||
}
|
||||
|
||||
public long getCapacity() {
|
||||
return getSpaceUsed();
|
||||
}
|
||||
}
|
||||
|
||||
static class ProvidedBlockPoolSlice {
|
||||
private ProvidedVolumeImpl providedVolume;
|
||||
|
||||
|
@ -96,6 +121,8 @@ public class ProvidedVolumeImpl extends FsVolumeImpl {
|
|||
private Configuration conf;
|
||||
private String bpid;
|
||||
private ReplicaMap bpVolumeMap;
|
||||
private ProvidedVolumeDF df;
|
||||
private AtomicLong numOfBlocks = new AtomicLong();
|
||||
|
||||
ProvidedBlockPoolSlice(String bpid, ProvidedVolumeImpl volume,
|
||||
Configuration conf) {
|
||||
|
@ -107,6 +134,7 @@ public class ProvidedVolumeImpl extends FsVolumeImpl {
|
|||
aliasMap = ReflectionUtils.newInstance(fmt, conf);
|
||||
this.conf = conf;
|
||||
this.bpid = bpid;
|
||||
this.df = new ProvidedVolumeDF();
|
||||
bpVolumeMap.initBlockPool(bpid);
|
||||
LOG.info("Created alias map using class: " + aliasMap.getClass());
|
||||
}
|
||||
|
@ -155,6 +183,8 @@ public class ProvidedVolumeImpl extends FsVolumeImpl {
|
|||
if (oldReplica == null) {
|
||||
volumeMap.add(bpid, newReplica);
|
||||
bpVolumeMap.add(bpid, newReplica);
|
||||
incrNumBlocks();
|
||||
incDfsUsed(region.getBlock().getNumBytes());
|
||||
} else {
|
||||
throw new IOException("A block with id " + newReplica.getBlockId()
|
||||
+ " already exists in the volumeMap");
|
||||
|
@ -163,6 +193,10 @@ public class ProvidedVolumeImpl extends FsVolumeImpl {
|
|||
}
|
||||
}
|
||||
|
||||
private void incrNumBlocks() {
|
||||
numOfBlocks.incrementAndGet();
|
||||
}
|
||||
|
||||
public boolean isEmpty() {
|
||||
return bpVolumeMap.replicas(bpid).size() == 0;
|
||||
}
|
||||
|
@ -199,6 +233,18 @@ public class ProvidedVolumeImpl extends FsVolumeImpl {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
public long getNumOfBlocks() {
|
||||
return numOfBlocks.get();
|
||||
}
|
||||
|
||||
long getDfsUsed() throws IOException {
|
||||
return df.getSpaceUsed();
|
||||
}
|
||||
|
||||
void incDfsUsed(long value) {
|
||||
df.incDfsUsed(value);
|
||||
}
|
||||
}
|
||||
|
||||
private URI baseURI;
|
||||
|
@ -217,10 +263,7 @@ public class ProvidedVolumeImpl extends FsVolumeImpl {
|
|||
"Only provided storages must use ProvidedVolume";
|
||||
|
||||
baseURI = getStorageLocation().getUri();
|
||||
Class<? extends ProvidedVolumeDF> dfClass =
|
||||
conf.getClass(DFSConfigKeys.DFS_PROVIDER_DF_CLASS,
|
||||
DefaultProvidedVolumeDF.class, ProvidedVolumeDF.class);
|
||||
df = ReflectionUtils.newInstance(dfClass, conf);
|
||||
df = new ProvidedVolumeDF();
|
||||
remoteFS = FileSystem.get(baseURI, conf);
|
||||
}
|
||||
|
||||
|
@ -231,34 +274,47 @@ public class ProvidedVolumeImpl extends FsVolumeImpl {
|
|||
|
||||
@Override
|
||||
public long getCapacity() {
|
||||
if (configuredCapacity < 0) {
|
||||
return df.getCapacity();
|
||||
try {
|
||||
// default to whatever is the space used!
|
||||
return getDfsUsed();
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Exception when trying to get capacity of ProvidedVolume: {}",
|
||||
e);
|
||||
}
|
||||
return configuredCapacity;
|
||||
return 0L;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getDfsUsed() throws IOException {
|
||||
return df.getSpaceUsed();
|
||||
long dfsUsed = 0;
|
||||
synchronized(getDataset()) {
|
||||
for(ProvidedBlockPoolSlice s : bpSlices.values()) {
|
||||
dfsUsed += s.getDfsUsed();
|
||||
}
|
||||
}
|
||||
return dfsUsed;
|
||||
}
|
||||
|
||||
@Override
|
||||
long getBlockPoolUsed(String bpid) throws IOException {
|
||||
if (bpSlices.containsKey(bpid)) {
|
||||
return df.getBlockPoolUsed(bpid);
|
||||
} else {
|
||||
throw new IOException("block pool " + bpid + " is not found");
|
||||
}
|
||||
return getProvidedBlockPoolSlice(bpid).getDfsUsed();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getAvailable() throws IOException {
|
||||
return df.getAvailable();
|
||||
long remaining = getCapacity() - getDfsUsed();
|
||||
// do not report less than 0 remaining space for PROVIDED storage
|
||||
// to prevent marking it as over capacity on NN
|
||||
if (remaining < 0L) {
|
||||
LOG.warn("Volume {} has less than 0 available space", this);
|
||||
return 0L;
|
||||
}
|
||||
return remaining;
|
||||
}
|
||||
|
||||
@Override
|
||||
long getActualNonDfsUsed() throws IOException {
|
||||
return df.getSpaceUsed();
|
||||
return 0L;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -266,6 +322,21 @@ public class ProvidedVolumeImpl extends FsVolumeImpl {
|
|||
return 0L;
|
||||
}
|
||||
|
||||
@Override
|
||||
long getNumBlocks() {
|
||||
long numBlocks = 0;
|
||||
for (ProvidedBlockPoolSlice s : bpSlices.values()) {
|
||||
numBlocks += s.getNumOfBlocks();
|
||||
}
|
||||
return numBlocks;
|
||||
}
|
||||
|
||||
@Override
|
||||
void incDfsUsedAndNumBlocks(String bpid, long value) {
|
||||
throw new UnsupportedOperationException(
|
||||
"ProvidedVolume does not yet support writes");
|
||||
}
|
||||
|
||||
@Override
|
||||
public URI getBaseURI() {
|
||||
return baseURI;
|
||||
|
|
|
@ -64,6 +64,12 @@ public interface FederationMBean {
|
|||
*/
|
||||
long getRemainingCapacity();
|
||||
|
||||
/**
|
||||
* Get the total remote storage capacity mounted in the federated cluster.
|
||||
* @return Remote capacity of the federated cluster.
|
||||
*/
|
||||
long getProvidedSpace();
|
||||
|
||||
/**
|
||||
* Get the number of nameservices in the federation.
|
||||
* @return Number of nameservices in the federation.
|
||||
|
|
|
@ -271,6 +271,11 @@ public class FederationMetrics implements FederationMBean {
|
|||
return getNameserviceAggregatedLong(MembershipStats::getAvailableSpace);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getProvidedSpace() {
|
||||
return getNameserviceAggregatedLong(MembershipStats::getProvidedSpace);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getUsedCapacity() {
|
||||
return getTotalCapacity() - getRemainingCapacity();
|
||||
|
|
|
@ -168,6 +168,11 @@ public class NamenodeBeanMetrics
|
|||
return getFederationMetrics().getTotalCapacity();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getProvidedCapacity() {
|
||||
return getFederationMetrics().getProvidedSpace();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getSafemode() {
|
||||
// We assume that the global federated view is never in safe mode
|
||||
|
@ -449,6 +454,11 @@ public class NamenodeBeanMetrics
|
|||
return this.getUsed();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getProvidedCapacityTotal() {
|
||||
return getProvidedCapacity();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getFilesTotal() {
|
||||
return getFederationMetrics().getNumFiles();
|
||||
|
|
|
@ -236,6 +236,7 @@ public class MembershipNamenodeResolver
|
|||
report.getNumOfBlocksPendingDeletion());
|
||||
stats.setAvailableSpace(report.getAvailableSpace());
|
||||
stats.setTotalSpace(report.getTotalSpace());
|
||||
stats.setProvidedSpace(report.getProvidedSpace());
|
||||
stats.setNumOfDecommissioningDatanodes(
|
||||
report.getNumDecommissioningDatanodes());
|
||||
stats.setNumOfActiveDatanodes(report.getNumLiveDatanodes());
|
||||
|
|
|
@ -58,6 +58,7 @@ public class NamenodeStatusReport {
|
|||
private long numOfBlocksUnderReplicated = -1;
|
||||
private long numOfBlocksPendingDeletion = -1;
|
||||
private long totalSpace = -1;
|
||||
private long providedSpace = -1;
|
||||
|
||||
/** If the fields are valid. */
|
||||
private boolean registrationValid = false;
|
||||
|
@ -296,7 +297,7 @@ public class NamenodeStatusReport {
|
|||
public void setNamesystemInfo(long available, long total,
|
||||
long numFiles, long numBlocks, long numBlocksMissing,
|
||||
long numBlocksPendingReplication, long numBlocksUnderReplicated,
|
||||
long numBlocksPendingDeletion) {
|
||||
long numBlocksPendingDeletion, long providedSpace) {
|
||||
this.totalSpace = total;
|
||||
this.availableSpace = available;
|
||||
this.numOfBlocks = numBlocks;
|
||||
|
@ -306,6 +307,7 @@ public class NamenodeStatusReport {
|
|||
this.numOfBlocksPendingDeletion = numBlocksPendingDeletion;
|
||||
this.numOfFiles = numFiles;
|
||||
this.statsValid = true;
|
||||
this.providedSpace = providedSpace;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -344,6 +346,14 @@ public class NamenodeStatusReport {
|
|||
return this.availableSpace;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the space occupied by provided storage.
|
||||
*
|
||||
* @return the provided capacity.
|
||||
*/
|
||||
public long getProvidedSpace() {
|
||||
return this.providedSpace;
|
||||
}
|
||||
/**
|
||||
* Get the number of missing blocks.
|
||||
*
|
||||
|
|
|
@ -350,7 +350,8 @@ public class NamenodeHeartbeatService extends PeriodicService {
|
|||
jsonObject.getLong("MissingBlocks"),
|
||||
jsonObject.getLong("PendingReplicationBlocks"),
|
||||
jsonObject.getLong("UnderReplicatedBlocks"),
|
||||
jsonObject.getLong("PendingDeletionBlocks"));
|
||||
jsonObject.getLong("PendingDeletionBlocks"),
|
||||
jsonObject.getLong("ProvidedCapacityTotal"));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -45,6 +45,10 @@ public abstract class MembershipStats extends BaseRecord {
|
|||
|
||||
public abstract long getAvailableSpace();
|
||||
|
||||
public abstract void setProvidedSpace(long capacity);
|
||||
|
||||
public abstract long getProvidedSpace();
|
||||
|
||||
public abstract void setNumOfFiles(long files);
|
||||
|
||||
public abstract long getNumOfFiles();
|
||||
|
|
|
@ -77,6 +77,16 @@ public class MembershipStatsPBImpl extends MembershipStats
|
|||
return this.translator.getProtoOrBuilder().getAvailableSpace();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setProvidedSpace(long capacity) {
|
||||
this.translator.getBuilder().setProvidedSpace(capacity);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getProvidedSpace() {
|
||||
return this.translator.getProtoOrBuilder().getProvidedSpace();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setNumOfFiles(long files) {
|
||||
this.translator.getBuilder().setNumOfFiles(files);
|
||||
|
|
|
@ -4166,6 +4166,13 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
return datanodeStatistics.getCapacityRemaining();
|
||||
}
|
||||
|
||||
@Override // FSNamesystemMBean
|
||||
@Metric({"ProvidedCapacityTotal",
|
||||
"Total space used in PROVIDED storage in bytes" })
|
||||
public long getProvidedCapacityTotal() {
|
||||
return datanodeStatistics.getProvidedCapacity();
|
||||
}
|
||||
|
||||
@Metric({"CapacityRemainingGB", "Remaining capacity in GB"})
|
||||
public float getCapacityRemainingGB() {
|
||||
return DFSUtil.roundBytesToGB(getCapacityRemaining());
|
||||
|
@ -5729,6 +5736,11 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
return this.getCapacityTotal();
|
||||
}
|
||||
|
||||
@Override // NameNodeMXBean
|
||||
public long getProvidedCapacity() {
|
||||
return this.getProvidedCapacityTotal();
|
||||
}
|
||||
|
||||
@Override // NameNodeMXBean
|
||||
public String getSafemode() {
|
||||
if (!this.isInSafeMode())
|
||||
|
|
|
@ -65,8 +65,14 @@ public interface NameNodeMXBean {
|
|||
* @return the total raw bytes including non-dfs used space
|
||||
*/
|
||||
public long getTotal();
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Gets capacity of the provided storage mounted, in bytes.
|
||||
*
|
||||
* @return the total raw bytes present in the provided storage.
|
||||
*/
|
||||
public long getProvidedCapacity();
|
||||
|
||||
/**
|
||||
* Gets the safemode status
|
||||
*
|
||||
|
|
|
@ -69,7 +69,12 @@ public interface FSNamesystemMBean {
|
|||
* @return - used capacity in bytes
|
||||
*/
|
||||
public long getCapacityUsed();
|
||||
|
||||
|
||||
/**
|
||||
* Total PROVIDED storage capacity.
|
||||
* @return - total PROVIDED storage capacity in bytes
|
||||
*/
|
||||
public long getProvidedCapacityTotal();
|
||||
|
||||
/**
|
||||
* Total number of files and directories
|
||||
|
|
|
@ -30,6 +30,7 @@ package hadoop.hdfs;
|
|||
message NamenodeMembershipStatsRecordProto {
|
||||
optional uint64 totalSpace = 1;
|
||||
optional uint64 availableSpace = 2;
|
||||
optional uint64 providedSpace = 3;
|
||||
|
||||
optional uint64 numOfFiles = 10;
|
||||
optional uint64 numOfBlocks = 11;
|
||||
|
|
|
@ -4629,14 +4629,6 @@
|
|||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.provided.df.class</name>
|
||||
<value>org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.DefaultProvidedVolumeDF</value>
|
||||
<description>
|
||||
The class that is used to measure usage statistics of provided stores.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.provided.storage.id</name>
|
||||
<value>DS-PROVIDED</value>
|
||||
|
|
|
@ -162,6 +162,7 @@
|
|||
{#nn}
|
||||
<table class="table table-bordered table-striped">
|
||||
<tr><th> Configured Capacity:</th><td>{Total|fmt_bytes}</td></tr>
|
||||
<tr><th> Configured Remote Capacity:</th><td>{ProvidedCapacity|fmt_bytes}</td></tr>
|
||||
<tr><th> DFS Used:</th><td>{Used|fmt_bytes} ({PercentUsed|fmt_percentage})</td></tr>
|
||||
<tr><th> Non DFS Used:</th><td>{NonDfsUsedSpace|fmt_bytes}</td></tr>
|
||||
<tr><th> DFS Remaining:</th><td>{Free|fmt_bytes} ({PercentRemaining|fmt_percentage})</td></tr>
|
||||
|
|
|
@ -63,15 +63,15 @@ public class TestProvidedStorageMap {
|
|||
|
||||
private DatanodeDescriptor createDatanodeDescriptor(int port) {
|
||||
return DFSTestUtil.getDatanodeDescriptor("127.0.0.1", port, "defaultRack",
|
||||
"localhost");
|
||||
"localhost");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testProvidedStorageMap() throws IOException {
|
||||
ProvidedStorageMap providedMap = new ProvidedStorageMap(
|
||||
nameSystemLock, bm, conf);
|
||||
nameSystemLock, bm, conf);
|
||||
DatanodeStorageInfo providedMapStorage =
|
||||
providedMap.getProvidedStorageInfo();
|
||||
providedMap.getProvidedStorageInfo();
|
||||
//the provided storage cannot be null
|
||||
assertNotNull(providedMapStorage);
|
||||
|
||||
|
@ -80,41 +80,40 @@ public class TestProvidedStorageMap {
|
|||
|
||||
//associate two storages to the datanode
|
||||
DatanodeStorage dn1ProvidedStorage = new DatanodeStorage(
|
||||
providedStorageID,
|
||||
DatanodeStorage.State.NORMAL,
|
||||
StorageType.PROVIDED);
|
||||
providedStorageID,
|
||||
DatanodeStorage.State.NORMAL,
|
||||
StorageType.PROVIDED);
|
||||
DatanodeStorage dn1DiskStorage = new DatanodeStorage(
|
||||
"sid-1", DatanodeStorage.State.NORMAL, StorageType.DISK);
|
||||
"sid-1", DatanodeStorage.State.NORMAL, StorageType.DISK);
|
||||
|
||||
when(nameSystemLock.hasWriteLock()).thenReturn(true);
|
||||
DatanodeStorageInfo dns1Provided = providedMap.getStorage(dn1,
|
||||
dn1ProvidedStorage, null);
|
||||
DatanodeStorageInfo dns1Disk = providedMap.getStorage(dn1,
|
||||
dn1DiskStorage, null);
|
||||
DatanodeStorageInfo dns1Provided =
|
||||
providedMap.getStorage(dn1, dn1ProvidedStorage);
|
||||
DatanodeStorageInfo dns1Disk = providedMap.getStorage(dn1, dn1DiskStorage);
|
||||
|
||||
assertTrue("The provided storages should be equal",
|
||||
dns1Provided == providedMapStorage);
|
||||
dns1Provided == providedMapStorage);
|
||||
assertTrue("Disk storage has not yet been registered with block manager",
|
||||
dns1Disk == null);
|
||||
dns1Disk == null);
|
||||
//add the disk storage to the datanode.
|
||||
DatanodeStorageInfo dnsDisk = new DatanodeStorageInfo(dn1, dn1DiskStorage);
|
||||
dn1.injectStorage(dnsDisk);
|
||||
assertTrue("Disk storage must match the injected storage info",
|
||||
dnsDisk == providedMap.getStorage(dn1, dn1DiskStorage, null));
|
||||
dnsDisk == providedMap.getStorage(dn1, dn1DiskStorage));
|
||||
|
||||
//create a 2nd datanode
|
||||
DatanodeDescriptor dn2 = createDatanodeDescriptor(5010);
|
||||
//associate a provided storage with the datanode
|
||||
DatanodeStorage dn2ProvidedStorage = new DatanodeStorage(
|
||||
providedStorageID,
|
||||
DatanodeStorage.State.NORMAL,
|
||||
StorageType.PROVIDED);
|
||||
providedStorageID,
|
||||
DatanodeStorage.State.NORMAL,
|
||||
StorageType.PROVIDED);
|
||||
|
||||
DatanodeStorageInfo dns2Provided = providedMap.getStorage(
|
||||
dn2, dn2ProvidedStorage, null);
|
||||
dn2, dn2ProvidedStorage);
|
||||
assertTrue("The provided storages should be equal",
|
||||
dns2Provided == providedMapStorage);
|
||||
dns2Provided == providedMapStorage);
|
||||
assertTrue("The DatanodeDescriptor should contain the provided storage",
|
||||
dn2.getStorageInfo(providedStorageID) == providedMapStorage);
|
||||
dn2.getStorageInfo(providedStorageID) == providedMapStorage);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -46,7 +46,6 @@ import java.util.Map;
|
|||
import java.util.Set;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.hadoop.conf.Configurable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystemTestHelper;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
@ -102,6 +101,7 @@ public class TestProvidedImpl {
|
|||
private FsDatasetImpl dataset;
|
||||
private static Map<Long, String> blkToPathMap;
|
||||
private static List<FsVolumeImpl> providedVolumes;
|
||||
private static long spaceUsed = 0;
|
||||
|
||||
/**
|
||||
* A simple FileRegion iterator for tests.
|
||||
|
@ -142,6 +142,7 @@ public class TestProvidedImpl {
|
|||
}
|
||||
writer.flush();
|
||||
writer.close();
|
||||
spaceUsed += BLK_LEN;
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
@ -240,39 +241,6 @@ public class TestProvidedImpl {
|
|||
}
|
||||
}
|
||||
|
||||
public static class TestProvidedVolumeDF
|
||||
implements ProvidedVolumeDF, Configurable {
|
||||
|
||||
@Override
|
||||
public void setConf(Configuration conf) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Configuration getConf() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getCapacity() {
|
||||
return Long.MAX_VALUE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getSpaceUsed() {
|
||||
return -1;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getBlockPoolUsed(String bpid) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getAvailable() {
|
||||
return Long.MAX_VALUE;
|
||||
}
|
||||
}
|
||||
|
||||
private static Storage.StorageDirectory createLocalStorageDirectory(
|
||||
File root, Configuration conf)
|
||||
throws SecurityException, IOException {
|
||||
|
@ -370,6 +338,8 @@ public class TestProvidedImpl {
|
|||
when(datanode.getConf()).thenReturn(conf);
|
||||
final DNConf dnConf = new DNConf(datanode);
|
||||
when(datanode.getDnConf()).thenReturn(dnConf);
|
||||
// reset the space used
|
||||
spaceUsed = 0;
|
||||
|
||||
final BlockScanner disabledBlockScanner = new BlockScanner(datanode, conf);
|
||||
when(datanode.getBlockScanner()).thenReturn(disabledBlockScanner);
|
||||
|
@ -379,8 +349,6 @@ public class TestProvidedImpl {
|
|||
|
||||
this.conf.setClass(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_CLASS,
|
||||
TestFileRegionBlockAliasMap.class, BlockAliasMap.class);
|
||||
conf.setClass(DFSConfigKeys.DFS_PROVIDER_DF_CLASS,
|
||||
TestProvidedVolumeDF.class, ProvidedVolumeDF.class);
|
||||
|
||||
blkToPathMap = new HashMap<Long, String>();
|
||||
providedVolumes = new LinkedList<FsVolumeImpl>();
|
||||
|
@ -410,8 +378,6 @@ public class TestProvidedImpl {
|
|||
assertEquals(NUM_PROVIDED_INIT_VOLUMES, providedVolumes.size());
|
||||
assertEquals(0, dataset.getNumFailedVolumes());
|
||||
|
||||
TestProvidedVolumeDF df = new TestProvidedVolumeDF();
|
||||
|
||||
for (int i = 0; i < providedVolumes.size(); i++) {
|
||||
//check basic information about provided volume
|
||||
assertEquals(DFSConfigKeys.DFS_PROVIDER_STORAGEUUID_DEFAULT,
|
||||
|
@ -419,18 +385,17 @@ public class TestProvidedImpl {
|
|||
assertEquals(StorageType.PROVIDED,
|
||||
providedVolumes.get(i).getStorageType());
|
||||
|
||||
long space = providedVolumes.get(i).getBlockPoolUsed(
|
||||
BLOCK_POOL_IDS[CHOSEN_BP_ID]);
|
||||
//check the df stats of the volume
|
||||
assertEquals(df.getAvailable(), providedVolumes.get(i).getAvailable());
|
||||
assertEquals(df.getBlockPoolUsed(BLOCK_POOL_IDS[CHOSEN_BP_ID]),
|
||||
providedVolumes.get(i).getBlockPoolUsed(
|
||||
BLOCK_POOL_IDS[CHOSEN_BP_ID]));
|
||||
assertEquals(spaceUsed, space);
|
||||
assertEquals(NUM_PROVIDED_BLKS, providedVolumes.get(i).getNumBlocks());
|
||||
|
||||
providedVolumes.get(i).shutdownBlockPool(
|
||||
BLOCK_POOL_IDS[1 - CHOSEN_BP_ID], null);
|
||||
try {
|
||||
assertEquals(df.getBlockPoolUsed(BLOCK_POOL_IDS[1 - CHOSEN_BP_ID]),
|
||||
providedVolumes.get(i).getBlockPoolUsed(
|
||||
BLOCK_POOL_IDS[1 - CHOSEN_BP_ID]));
|
||||
assertEquals(0, providedVolumes.get(i)
|
||||
.getBlockPoolUsed(BLOCK_POOL_IDS[1 - CHOSEN_BP_ID]));
|
||||
//should not be triggered
|
||||
assertTrue(false);
|
||||
} catch (IOException e) {
|
||||
|
|
|
@ -190,6 +190,8 @@ public class TestFederationMetrics extends TestMetricsBase {
|
|||
json.getLong("numOfDecomActiveDatanodes"));
|
||||
assertEquals(stats.getNumOfDecomDeadDatanodes(),
|
||||
json.getLong("numOfDecomDeadDatanodes"));
|
||||
assertEquals(stats.getProvidedSpace(),
|
||||
json.getLong("providedSpace"));
|
||||
nameservicesFound++;
|
||||
}
|
||||
assertEquals(getNameservices().size(), nameservicesFound);
|
||||
|
|
|
@ -27,6 +27,7 @@ import java.net.InetSocketAddress;
|
|||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.Channels;
|
||||
import java.nio.channels.ReadableByteChannel;
|
||||
import java.util.Iterator;
|
||||
import java.util.Random;
|
||||
import org.apache.hadoop.fs.BlockLocation;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -44,13 +45,23 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStatistics;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.ProvidedStorageMap;
|
||||
import org.apache.hadoop.hdfs.server.common.blockaliasmap.BlockAliasMap;
|
||||
import org.apache.hadoop.hdfs.server.common.blockaliasmap.impl.TextFileRegionAliasMap;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY;
|
||||
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
|
||||
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
||||
import org.apache.hadoop.net.NodeBase;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
|
@ -59,6 +70,7 @@ import org.junit.rules.TestName;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.hadoop.net.NodeBase.PATH_SEPARATOR_STR;
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
public class TestNameNodeProvidedImplementation {
|
||||
|
@ -79,6 +91,7 @@ public class TestNameNodeProvidedImplementation {
|
|||
private final String filePrefix = "file";
|
||||
private final String fileSuffix = ".dat";
|
||||
private final int baseFileLen = 1024;
|
||||
private long providedDataSize = 0;
|
||||
|
||||
Configuration conf;
|
||||
MiniDFSCluster cluster;
|
||||
|
@ -135,6 +148,7 @@ public class TestNameNodeProvidedImplementation {
|
|||
}
|
||||
writer.flush();
|
||||
writer.close();
|
||||
providedDataSize += newFile.length();
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
@ -206,13 +220,14 @@ public class TestNameNodeProvidedImplementation {
|
|||
cluster.waitActive();
|
||||
}
|
||||
|
||||
@Test(timeout = 20000)
|
||||
@Test(timeout=20000)
|
||||
public void testLoadImage() throws Exception {
|
||||
final long seed = r.nextLong();
|
||||
LOG.info("NAMEPATH: " + NAMEPATH);
|
||||
createImage(new RandomTreeWalk(seed), NNDIRPATH, FixedBlockResolver.class);
|
||||
startCluster(NNDIRPATH, 0, new StorageType[] {StorageType.PROVIDED},
|
||||
null, false);
|
||||
startCluster(NNDIRPATH, 0,
|
||||
new StorageType[] {StorageType.PROVIDED, StorageType.DISK}, null,
|
||||
false);
|
||||
|
||||
FileSystem fs = cluster.getFileSystem();
|
||||
for (TreePath e : new RandomTreeWalk(seed)) {
|
||||
|
@ -231,14 +246,83 @@ public class TestNameNodeProvidedImplementation {
|
|||
}
|
||||
}
|
||||
|
||||
@Test(timeout=20000)
|
||||
public void testBlockLoad() throws Exception {
|
||||
@Test(timeout=30000)
|
||||
public void testProvidedReporting() throws Exception {
|
||||
conf.setClass(ImageWriter.Options.UGI_CLASS,
|
||||
SingleUGIResolver.class, UGIResolver.class);
|
||||
createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH,
|
||||
FixedBlockResolver.class);
|
||||
startCluster(NNDIRPATH, 1, new StorageType[] {StorageType.PROVIDED},
|
||||
null, false);
|
||||
int numDatanodes = 10;
|
||||
startCluster(NNDIRPATH, numDatanodes,
|
||||
new StorageType[] {StorageType.PROVIDED, StorageType.DISK}, null,
|
||||
false);
|
||||
long diskCapacity = 1000;
|
||||
// set the DISK capacity for testing
|
||||
for (DataNode dn: cluster.getDataNodes()) {
|
||||
for (FsVolumeSpi ref : dn.getFSDataset().getFsVolumeReferences()) {
|
||||
if (ref.getStorageType() == StorageType.DISK) {
|
||||
((FsVolumeImpl) ref).setCapacityForTesting(diskCapacity);
|
||||
}
|
||||
}
|
||||
}
|
||||
// trigger heartbeats to update the capacities
|
||||
cluster.triggerHeartbeats();
|
||||
Thread.sleep(10000);
|
||||
// verify namenode stats
|
||||
FSNamesystem namesystem = cluster.getNameNode().getNamesystem();
|
||||
DatanodeStatistics dnStats = namesystem.getBlockManager()
|
||||
.getDatanodeManager().getDatanodeStatistics();
|
||||
|
||||
// total capacity reported includes only the local volumes and
|
||||
// not the provided capacity
|
||||
assertEquals(diskCapacity * numDatanodes, namesystem.getTotal());
|
||||
|
||||
// total storage used should be equal to the totalProvidedStorage
|
||||
// no capacity should be remaining!
|
||||
assertEquals(providedDataSize, dnStats.getProvidedCapacity());
|
||||
assertEquals(providedDataSize, namesystem.getProvidedCapacityTotal());
|
||||
assertEquals(providedDataSize, dnStats.getStorageTypeStats()
|
||||
.get(StorageType.PROVIDED).getCapacityTotal());
|
||||
assertEquals(providedDataSize, dnStats.getStorageTypeStats()
|
||||
.get(StorageType.PROVIDED).getCapacityUsed());
|
||||
|
||||
// verify datanode stats
|
||||
for (DataNode dn: cluster.getDataNodes()) {
|
||||
for (StorageReport report : dn.getFSDataset()
|
||||
.getStorageReports(namesystem.getBlockPoolId())) {
|
||||
if (report.getStorage().getStorageType() == StorageType.PROVIDED) {
|
||||
assertEquals(providedDataSize, report.getCapacity());
|
||||
assertEquals(providedDataSize, report.getDfsUsed());
|
||||
assertEquals(providedDataSize, report.getBlockPoolUsed());
|
||||
assertEquals(0, report.getNonDfsUsed());
|
||||
assertEquals(0, report.getRemaining());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
DFSClient client = new DFSClient(new InetSocketAddress("localhost",
|
||||
cluster.getNameNodePort()), cluster.getConfiguration(0));
|
||||
BlockManager bm = namesystem.getBlockManager();
|
||||
for (int fileId = 0; fileId < numFiles; fileId++) {
|
||||
String filename = "/" + filePrefix + fileId + fileSuffix;
|
||||
LocatedBlocks locatedBlocks = client.getLocatedBlocks(
|
||||
filename, 0, baseFileLen);
|
||||
for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
|
||||
BlockInfo blockInfo =
|
||||
bm.getStoredBlock(locatedBlock.getBlock().getLocalBlock());
|
||||
Iterator<DatanodeStorageInfo> storagesItr = blockInfo.getStorageInfos();
|
||||
|
||||
DatanodeStorageInfo info = storagesItr.next();
|
||||
assertEquals(StorageType.PROVIDED, info.getStorageType());
|
||||
DatanodeDescriptor dnDesc = info.getDatanodeDescriptor();
|
||||
// check the locations that are returned by FSCK have the right name
|
||||
assertEquals(ProvidedStorageMap.ProvidedDescriptor.NETWORK_LOCATION
|
||||
+ PATH_SEPARATOR_STR + ProvidedStorageMap.ProvidedDescriptor.NAME,
|
||||
NodeBase.getPath(dnDesc));
|
||||
// no DatanodeStorageInfos should remain
|
||||
assertFalse(storagesItr.hasNext());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=500000)
|
||||
|
@ -250,8 +334,8 @@ public class TestNameNodeProvidedImplementation {
|
|||
// make the last Datanode with only DISK
|
||||
startCluster(NNDIRPATH, 3, null,
|
||||
new StorageType[][] {
|
||||
{StorageType.PROVIDED},
|
||||
{StorageType.PROVIDED},
|
||||
{StorageType.PROVIDED, StorageType.DISK},
|
||||
{StorageType.PROVIDED, StorageType.DISK},
|
||||
{StorageType.DISK}},
|
||||
false);
|
||||
// wait for the replication to finish
|
||||
|
@ -308,8 +392,9 @@ public class TestNameNodeProvidedImplementation {
|
|||
FsUGIResolver.class, UGIResolver.class);
|
||||
createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH,
|
||||
FixedBlockResolver.class);
|
||||
startCluster(NNDIRPATH, 3, new StorageType[] {StorageType.PROVIDED},
|
||||
null, false);
|
||||
startCluster(NNDIRPATH, 3,
|
||||
new StorageType[] {StorageType.PROVIDED, StorageType.DISK}, null,
|
||||
false);
|
||||
FileSystem fs = cluster.getFileSystem();
|
||||
Thread.sleep(2000);
|
||||
int count = 0;
|
||||
|
@ -371,7 +456,7 @@ public class TestNameNodeProvidedImplementation {
|
|||
return fs.getFileBlockLocations(path, 0, fileLen);
|
||||
}
|
||||
|
||||
@Test
|
||||
@Test(timeout=30000)
|
||||
public void testClusterWithEmptyImage() throws IOException {
|
||||
// start a cluster with 2 datanodes without any provided storage
|
||||
startCluster(NNDIRPATH, 2, null,
|
||||
|
@ -404,7 +489,7 @@ public class TestNameNodeProvidedImplementation {
|
|||
* Tests setting replication of provided files.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
@Test(timeout=30000)
|
||||
public void testSetReplicationForProvidedFiles() throws Exception {
|
||||
createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH,
|
||||
FixedBlockResolver.class);
|
||||
|
@ -441,14 +526,14 @@ public class TestNameNodeProvidedImplementation {
|
|||
getAndCheckBlockLocations(client, filename, newReplication);
|
||||
}
|
||||
|
||||
@Test
|
||||
@Test(timeout=30000)
|
||||
public void testProvidedDatanodeFailures() throws Exception {
|
||||
createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH,
|
||||
FixedBlockResolver.class);
|
||||
startCluster(NNDIRPATH, 3, null,
|
||||
new StorageType[][] {
|
||||
{StorageType.PROVIDED},
|
||||
{StorageType.PROVIDED},
|
||||
{StorageType.PROVIDED, StorageType.DISK},
|
||||
{StorageType.PROVIDED, StorageType.DISK},
|
||||
{StorageType.DISK}},
|
||||
false);
|
||||
|
||||
|
@ -511,7 +596,7 @@ public class TestNameNodeProvidedImplementation {
|
|||
// 2 Datanodes, 1 PROVIDED and other DISK
|
||||
startCluster(NNDIRPATH, 2, null,
|
||||
new StorageType[][] {
|
||||
{StorageType.PROVIDED},
|
||||
{StorageType.PROVIDED, StorageType.DISK},
|
||||
{StorageType.DISK}},
|
||||
false);
|
||||
|
||||
|
@ -540,7 +625,7 @@ public class TestNameNodeProvidedImplementation {
|
|||
// 2 Datanodes, 1 PROVIDED and other DISK
|
||||
startCluster(NNDIRPATH, 2, null,
|
||||
new StorageType[][] {
|
||||
{StorageType.PROVIDED},
|
||||
{StorageType.PROVIDED, StorageType.DISK},
|
||||
{StorageType.DISK}},
|
||||
false);
|
||||
|
||||
|
@ -570,7 +655,7 @@ public class TestNameNodeProvidedImplementation {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@Test(timeout=30000)
|
||||
public void testSetClusterID() throws Exception {
|
||||
String clusterID = "PROVIDED-CLUSTER";
|
||||
createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH,
|
||||
|
@ -578,7 +663,7 @@ public class TestNameNodeProvidedImplementation {
|
|||
// 2 Datanodes, 1 PROVIDED and other DISK
|
||||
startCluster(NNDIRPATH, 2, null,
|
||||
new StorageType[][] {
|
||||
{StorageType.PROVIDED},
|
||||
{StorageType.PROVIDED, StorageType.DISK},
|
||||
{StorageType.DISK}},
|
||||
false);
|
||||
NameNode nn = cluster.getNameNode();
|
||||
|
|
Loading…
Reference in New Issue