HDFS-9038. DFS reserved space is erroneously counted towards non-DFS used. (Brahma Reddy Battula)
This commit is contained in:
parent
e84b5c5e3e
commit
162ee0f0a4
|
@ -43,6 +43,7 @@ import static org.apache.hadoop.hdfs.DFSUtilClient.percent2String;
|
||||||
public class DatanodeInfo extends DatanodeID implements Node {
|
public class DatanodeInfo extends DatanodeID implements Node {
|
||||||
private long capacity;
|
private long capacity;
|
||||||
private long dfsUsed;
|
private long dfsUsed;
|
||||||
|
private long nonDfsUsed;
|
||||||
private long remaining;
|
private long remaining;
|
||||||
private long blockPoolUsed;
|
private long blockPoolUsed;
|
||||||
private long cacheCapacity;
|
private long cacheCapacity;
|
||||||
|
@ -86,6 +87,7 @@ public class DatanodeInfo extends DatanodeID implements Node {
|
||||||
super(from);
|
super(from);
|
||||||
this.capacity = from.getCapacity();
|
this.capacity = from.getCapacity();
|
||||||
this.dfsUsed = from.getDfsUsed();
|
this.dfsUsed = from.getDfsUsed();
|
||||||
|
this.nonDfsUsed = from.getNonDfsUsed();
|
||||||
this.remaining = from.getRemaining();
|
this.remaining = from.getRemaining();
|
||||||
this.blockPoolUsed = from.getBlockPoolUsed();
|
this.blockPoolUsed = from.getBlockPoolUsed();
|
||||||
this.cacheCapacity = from.getCacheCapacity();
|
this.cacheCapacity = from.getCacheCapacity();
|
||||||
|
@ -102,6 +104,7 @@ public class DatanodeInfo extends DatanodeID implements Node {
|
||||||
super(nodeID);
|
super(nodeID);
|
||||||
this.capacity = 0L;
|
this.capacity = 0L;
|
||||||
this.dfsUsed = 0L;
|
this.dfsUsed = 0L;
|
||||||
|
this.nonDfsUsed = 0L;
|
||||||
this.remaining = 0L;
|
this.remaining = 0L;
|
||||||
this.blockPoolUsed = 0L;
|
this.blockPoolUsed = 0L;
|
||||||
this.cacheCapacity = 0L;
|
this.cacheCapacity = 0L;
|
||||||
|
@ -155,10 +158,26 @@ public class DatanodeInfo extends DatanodeID implements Node {
|
||||||
final int xceiverCount, final String networkLocation,
|
final int xceiverCount, final String networkLocation,
|
||||||
final AdminStates adminState,
|
final AdminStates adminState,
|
||||||
final String upgradeDomain) {
|
final String upgradeDomain) {
|
||||||
super(ipAddr, hostName, datanodeUuid, xferPort, infoPort,
|
this(ipAddr, hostName, datanodeUuid, xferPort, infoPort, infoSecurePort,
|
||||||
infoSecurePort, ipcPort);
|
ipcPort, capacity, dfsUsed, 0L, remaining, blockPoolUsed,
|
||||||
|
cacheCapacity, cacheUsed, lastUpdate, lastUpdateMonotonic,
|
||||||
|
xceiverCount, networkLocation, adminState, upgradeDomain);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Constructor. */
|
||||||
|
public DatanodeInfo(final String ipAddr, final String hostName,
|
||||||
|
final String datanodeUuid, final int xferPort, final int infoPort,
|
||||||
|
final int infoSecurePort, final int ipcPort, final long capacity,
|
||||||
|
final long dfsUsed, final long nonDfsUsed, final long remaining,
|
||||||
|
final long blockPoolUsed, final long cacheCapacity, final long cacheUsed,
|
||||||
|
final long lastUpdate, final long lastUpdateMonotonic,
|
||||||
|
final int xceiverCount, final String networkLocation,
|
||||||
|
final AdminStates adminState, final String upgradeDomain) {
|
||||||
|
super(ipAddr, hostName, datanodeUuid, xferPort, infoPort, infoSecurePort,
|
||||||
|
ipcPort);
|
||||||
this.capacity = capacity;
|
this.capacity = capacity;
|
||||||
this.dfsUsed = dfsUsed;
|
this.dfsUsed = dfsUsed;
|
||||||
|
this.nonDfsUsed = nonDfsUsed;
|
||||||
this.remaining = remaining;
|
this.remaining = remaining;
|
||||||
this.blockPoolUsed = blockPoolUsed;
|
this.blockPoolUsed = blockPoolUsed;
|
||||||
this.cacheCapacity = cacheCapacity;
|
this.cacheCapacity = cacheCapacity;
|
||||||
|
@ -171,7 +190,7 @@ public class DatanodeInfo extends DatanodeID implements Node {
|
||||||
this.upgradeDomain = upgradeDomain;
|
this.upgradeDomain = upgradeDomain;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Network location name */
|
/** Network location name. */
|
||||||
@Override
|
@Override
|
||||||
public String getName() {
|
public String getName() {
|
||||||
return getXferAddr();
|
return getXferAddr();
|
||||||
|
@ -188,8 +207,7 @@ public class DatanodeInfo extends DatanodeID implements Node {
|
||||||
|
|
||||||
/** The used space by the data node. */
|
/** The used space by the data node. */
|
||||||
public long getNonDfsUsed() {
|
public long getNonDfsUsed() {
|
||||||
long nonDFSUsed = capacity - dfsUsed - remaining;
|
return nonDfsUsed;
|
||||||
return nonDFSUsed < 0 ? 0 : nonDFSUsed;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/** The used space by the data node as percentage of present capacity */
|
/** The used space by the data node as percentage of present capacity */
|
||||||
|
@ -279,6 +297,11 @@ public class DatanodeInfo extends DatanodeID implements Node {
|
||||||
this.dfsUsed = dfsUsed;
|
this.dfsUsed = dfsUsed;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Sets the nondfs-used space for the datanode. */
|
||||||
|
public void setNonDfsUsed(long nonDfsUsed) {
|
||||||
|
this.nonDfsUsed = nonDfsUsed;
|
||||||
|
}
|
||||||
|
|
||||||
/** Sets raw free space. */
|
/** Sets raw free space. */
|
||||||
public void setRemaining(long remaining) {
|
public void setRemaining(long remaining) {
|
||||||
this.remaining = remaining;
|
this.remaining = remaining;
|
||||||
|
|
|
@ -280,6 +280,7 @@ public class PBHelperClient {
|
||||||
.setId(convert((DatanodeID) info))
|
.setId(convert((DatanodeID) info))
|
||||||
.setCapacity(info.getCapacity())
|
.setCapacity(info.getCapacity())
|
||||||
.setDfsUsed(info.getDfsUsed())
|
.setDfsUsed(info.getDfsUsed())
|
||||||
|
.setNonDfsUsed(info.getNonDfsUsed())
|
||||||
.setRemaining(info.getRemaining())
|
.setRemaining(info.getRemaining())
|
||||||
.setBlockPoolUsed(info.getBlockPoolUsed())
|
.setBlockPoolUsed(info.getBlockPoolUsed())
|
||||||
.setCacheCapacity(info.getCacheCapacity())
|
.setCacheCapacity(info.getCacheCapacity())
|
||||||
|
@ -543,15 +544,24 @@ public class PBHelperClient {
|
||||||
}
|
}
|
||||||
|
|
||||||
static public DatanodeInfo convert(DatanodeInfoProto di) {
|
static public DatanodeInfo convert(DatanodeInfoProto di) {
|
||||||
if (di == null) return null;
|
if (di == null) {
|
||||||
return new DatanodeInfo(
|
return null;
|
||||||
convert(di.getId()),
|
}
|
||||||
di.hasLocation() ? di.getLocation() : null,
|
DatanodeInfo dinfo = new DatanodeInfo(convert(di.getId()),
|
||||||
di.getCapacity(), di.getDfsUsed(), di.getRemaining(),
|
di.hasLocation() ? di.getLocation() : null, di.getCapacity(),
|
||||||
di.getBlockPoolUsed(), di.getCacheCapacity(), di.getCacheUsed(),
|
di.getDfsUsed(), di.getRemaining(), di.getBlockPoolUsed(),
|
||||||
di.getLastUpdate(), di.getLastUpdateMonotonic(),
|
di.getCacheCapacity(), di.getCacheUsed(), di.getLastUpdate(),
|
||||||
di.getXceiverCount(), convert(di.getAdminState()),
|
di.getLastUpdateMonotonic(), di.getXceiverCount(),
|
||||||
|
convert(di.getAdminState()),
|
||||||
di.hasUpgradeDomain() ? di.getUpgradeDomain() : null);
|
di.hasUpgradeDomain() ? di.getUpgradeDomain() : null);
|
||||||
|
if (di.hasNonDfsUsed()) {
|
||||||
|
dinfo.setNonDfsUsed(di.getNonDfsUsed());
|
||||||
|
} else {
|
||||||
|
// use the legacy way for older datanodes
|
||||||
|
long nonDFSUsed = di.getCapacity() - di.getDfsUsed() - di.getRemaining();
|
||||||
|
dinfo.setNonDfsUsed(nonDFSUsed < 0 ? 0 : nonDFSUsed);
|
||||||
|
}
|
||||||
|
return dinfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static StorageType[] convertStorageTypes(
|
public static StorageType[] convertStorageTypes(
|
||||||
|
@ -1454,12 +1464,12 @@ public class PBHelperClient {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static StorageReport convert(StorageReportProto p) {
|
public static StorageReport convert(StorageReportProto p) {
|
||||||
return new StorageReport(
|
long nonDfsUsed = p.hasNonDfsUsed() ? p.getNonDfsUsed() : p.getCapacity()
|
||||||
p.hasStorage() ?
|
- p.getDfsUsed() - p.getRemaining();
|
||||||
convert(p.getStorage()) :
|
return new StorageReport(p.hasStorage() ? convert(p.getStorage())
|
||||||
new DatanodeStorage(p.getStorageUuid()),
|
: new DatanodeStorage(p.getStorageUuid()), p.getFailed(),
|
||||||
p.getFailed(), p.getCapacity(), p.getDfsUsed(), p.getRemaining(),
|
p.getCapacity(), p.getDfsUsed(), p.getRemaining(),
|
||||||
p.getBlockPoolUsed());
|
p.getBlockPoolUsed(), nonDfsUsed);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static DatanodeStorage convert(DatanodeStorageProto s) {
|
public static DatanodeStorage convert(DatanodeStorageProto s) {
|
||||||
|
@ -2025,7 +2035,8 @@ public class PBHelperClient {
|
||||||
.setBlockPoolUsed(r.getBlockPoolUsed()).setCapacity(r.getCapacity())
|
.setBlockPoolUsed(r.getBlockPoolUsed()).setCapacity(r.getCapacity())
|
||||||
.setDfsUsed(r.getDfsUsed()).setRemaining(r.getRemaining())
|
.setDfsUsed(r.getDfsUsed()).setRemaining(r.getRemaining())
|
||||||
.setStorageUuid(r.getStorage().getStorageID())
|
.setStorageUuid(r.getStorage().getStorageID())
|
||||||
.setStorage(convert(r.getStorage()));
|
.setStorage(convert(r.getStorage()))
|
||||||
|
.setNonDfsUsed(r.getNonDfsUsed());
|
||||||
return builder.build();
|
return builder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -25,17 +25,19 @@ public class StorageReport {
|
||||||
private final boolean failed;
|
private final boolean failed;
|
||||||
private final long capacity;
|
private final long capacity;
|
||||||
private final long dfsUsed;
|
private final long dfsUsed;
|
||||||
|
private final long nonDfsUsed;
|
||||||
private final long remaining;
|
private final long remaining;
|
||||||
private final long blockPoolUsed;
|
private final long blockPoolUsed;
|
||||||
|
|
||||||
public static final StorageReport[] EMPTY_ARRAY = {};
|
public static final StorageReport[] EMPTY_ARRAY = {};
|
||||||
|
|
||||||
public StorageReport(DatanodeStorage storage, boolean failed,
|
public StorageReport(DatanodeStorage storage, boolean failed, long capacity,
|
||||||
long capacity, long dfsUsed, long remaining, long bpUsed) {
|
long dfsUsed, long remaining, long bpUsed, long nonDfsUsed) {
|
||||||
this.storage = storage;
|
this.storage = storage;
|
||||||
this.failed = failed;
|
this.failed = failed;
|
||||||
this.capacity = capacity;
|
this.capacity = capacity;
|
||||||
this.dfsUsed = dfsUsed;
|
this.dfsUsed = dfsUsed;
|
||||||
|
this.nonDfsUsed = nonDfsUsed;
|
||||||
this.remaining = remaining;
|
this.remaining = remaining;
|
||||||
this.blockPoolUsed = bpUsed;
|
this.blockPoolUsed = bpUsed;
|
||||||
}
|
}
|
||||||
|
@ -56,6 +58,10 @@ public class StorageReport {
|
||||||
return dfsUsed;
|
return dfsUsed;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public long getNonDfsUsed() {
|
||||||
|
return nonDfsUsed;
|
||||||
|
}
|
||||||
|
|
||||||
public long getRemaining() {
|
public long getRemaining() {
|
||||||
return remaining;
|
return remaining;
|
||||||
}
|
}
|
||||||
|
|
|
@ -99,6 +99,7 @@ message DatanodeInfoProto {
|
||||||
optional uint64 cacheUsed = 12 [default = 0];
|
optional uint64 cacheUsed = 12 [default = 0];
|
||||||
optional uint64 lastUpdateMonotonic = 13 [default = 0];
|
optional uint64 lastUpdateMonotonic = 13 [default = 0];
|
||||||
optional string upgradeDomain = 14;
|
optional string upgradeDomain = 14;
|
||||||
|
optional uint64 nonDfsUsed = 15;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -123,6 +124,7 @@ message StorageReportProto {
|
||||||
optional uint64 remaining = 5 [ default = 0 ];
|
optional uint64 remaining = 5 [ default = 0 ];
|
||||||
optional uint64 blockPoolUsed = 6 [ default = 0 ];
|
optional uint64 blockPoolUsed = 6 [ default = 0 ];
|
||||||
optional DatanodeStorageProto storage = 7; // supersedes StorageUuid
|
optional DatanodeStorageProto storage = 7; // supersedes StorageUuid
|
||||||
|
optional uint64 nonDfsUsed = 8;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -412,6 +412,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
|
||||||
long totalRemaining = 0;
|
long totalRemaining = 0;
|
||||||
long totalBlockPoolUsed = 0;
|
long totalBlockPoolUsed = 0;
|
||||||
long totalDfsUsed = 0;
|
long totalDfsUsed = 0;
|
||||||
|
long totalNonDfsUsed = 0;
|
||||||
Set<DatanodeStorageInfo> failedStorageInfos = null;
|
Set<DatanodeStorageInfo> failedStorageInfos = null;
|
||||||
|
|
||||||
// Decide if we should check for any missing StorageReport and mark it as
|
// Decide if we should check for any missing StorageReport and mark it as
|
||||||
|
@ -472,6 +473,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
|
||||||
totalRemaining += report.getRemaining();
|
totalRemaining += report.getRemaining();
|
||||||
totalBlockPoolUsed += report.getBlockPoolUsed();
|
totalBlockPoolUsed += report.getBlockPoolUsed();
|
||||||
totalDfsUsed += report.getDfsUsed();
|
totalDfsUsed += report.getDfsUsed();
|
||||||
|
totalNonDfsUsed += report.getNonDfsUsed();
|
||||||
}
|
}
|
||||||
rollBlocksScheduled(getLastUpdateMonotonic());
|
rollBlocksScheduled(getLastUpdateMonotonic());
|
||||||
|
|
||||||
|
@ -480,6 +482,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
|
||||||
setRemaining(totalRemaining);
|
setRemaining(totalRemaining);
|
||||||
setBlockPoolUsed(totalBlockPoolUsed);
|
setBlockPoolUsed(totalBlockPoolUsed);
|
||||||
setDfsUsed(totalDfsUsed);
|
setDfsUsed(totalDfsUsed);
|
||||||
|
setNonDfsUsed(totalNonDfsUsed);
|
||||||
if (checkFailedStorages) {
|
if (checkFailedStorages) {
|
||||||
updateFailedStorage(failedStorageInfos);
|
updateFailedStorage(failedStorageInfos);
|
||||||
}
|
}
|
||||||
|
|
|
@ -35,6 +35,7 @@ class DatanodeStats {
|
||||||
private final StorageTypeStatsMap statsMap = new StorageTypeStatsMap();
|
private final StorageTypeStatsMap statsMap = new StorageTypeStatsMap();
|
||||||
private long capacityTotal = 0L;
|
private long capacityTotal = 0L;
|
||||||
private long capacityUsed = 0L;
|
private long capacityUsed = 0L;
|
||||||
|
private long capacityUsedNonDfs = 0L;
|
||||||
private long capacityRemaining = 0L;
|
private long capacityRemaining = 0L;
|
||||||
private long blockPoolUsed = 0L;
|
private long blockPoolUsed = 0L;
|
||||||
private int xceiverCount = 0;
|
private int xceiverCount = 0;
|
||||||
|
@ -49,6 +50,7 @@ class DatanodeStats {
|
||||||
xceiverCount += node.getXceiverCount();
|
xceiverCount += node.getXceiverCount();
|
||||||
if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
|
if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
|
||||||
capacityUsed += node.getDfsUsed();
|
capacityUsed += node.getDfsUsed();
|
||||||
|
capacityUsedNonDfs += node.getNonDfsUsed();
|
||||||
blockPoolUsed += node.getBlockPoolUsed();
|
blockPoolUsed += node.getBlockPoolUsed();
|
||||||
nodesInService++;
|
nodesInService++;
|
||||||
nodesInServiceXceiverCount += node.getXceiverCount();
|
nodesInServiceXceiverCount += node.getXceiverCount();
|
||||||
|
@ -76,6 +78,7 @@ class DatanodeStats {
|
||||||
xceiverCount -= node.getXceiverCount();
|
xceiverCount -= node.getXceiverCount();
|
||||||
if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
|
if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
|
||||||
capacityUsed -= node.getDfsUsed();
|
capacityUsed -= node.getDfsUsed();
|
||||||
|
capacityUsedNonDfs -= node.getNonDfsUsed();
|
||||||
blockPoolUsed -= node.getBlockPoolUsed();
|
blockPoolUsed -= node.getBlockPoolUsed();
|
||||||
nodesInService--;
|
nodesInService--;
|
||||||
nodesInServiceXceiverCount -= node.getXceiverCount();
|
nodesInServiceXceiverCount -= node.getXceiverCount();
|
||||||
|
@ -157,8 +160,7 @@ class DatanodeStats {
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized long getCapacityUsedNonDFS() {
|
synchronized long getCapacityUsedNonDFS() {
|
||||||
final long nonDFSUsed = capacityTotal - capacityRemaining - capacityUsed;
|
return capacityUsedNonDfs;
|
||||||
return nonDFSUsed < 0L? 0L : nonDFSUsed;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized float getCapacityUsedPercent() {
|
synchronized float getCapacityUsedPercent() {
|
||||||
|
|
|
@ -116,6 +116,7 @@ public class DatanodeStorageInfo {
|
||||||
|
|
||||||
private long capacity;
|
private long capacity;
|
||||||
private long dfsUsed;
|
private long dfsUsed;
|
||||||
|
private long nonDfsUsed;
|
||||||
private volatile long remaining;
|
private volatile long remaining;
|
||||||
private long blockPoolUsed;
|
private long blockPoolUsed;
|
||||||
|
|
||||||
|
@ -225,6 +226,10 @@ public class DatanodeStorageInfo {
|
||||||
return dfsUsed;
|
return dfsUsed;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
long getNonDfsUsed() {
|
||||||
|
return nonDfsUsed;
|
||||||
|
}
|
||||||
|
|
||||||
long getRemaining() {
|
long getRemaining() {
|
||||||
return remaining;
|
return remaining;
|
||||||
}
|
}
|
||||||
|
@ -298,6 +303,7 @@ public class DatanodeStorageInfo {
|
||||||
void updateState(StorageReport r) {
|
void updateState(StorageReport r) {
|
||||||
capacity = r.getCapacity();
|
capacity = r.getCapacity();
|
||||||
dfsUsed = r.getDfsUsed();
|
dfsUsed = r.getDfsUsed();
|
||||||
|
nonDfsUsed = r.getNonDfsUsed();
|
||||||
remaining = r.getRemaining();
|
remaining = r.getRemaining();
|
||||||
blockPoolUsed = r.getBlockPoolUsed();
|
blockPoolUsed = r.getBlockPoolUsed();
|
||||||
}
|
}
|
||||||
|
@ -347,7 +353,7 @@ public class DatanodeStorageInfo {
|
||||||
StorageReport toStorageReport() {
|
StorageReport toStorageReport() {
|
||||||
return new StorageReport(
|
return new StorageReport(
|
||||||
new DatanodeStorage(storageID, state, storageType),
|
new DatanodeStorage(storageID, state, storageType),
|
||||||
false, capacity, dfsUsed, remaining, blockPoolUsed);
|
false, capacity, dfsUsed, remaining, blockPoolUsed, nonDfsUsed);
|
||||||
}
|
}
|
||||||
|
|
||||||
static Iterable<StorageType> toStorageTypes(
|
static Iterable<StorageType> toStorageTypes(
|
||||||
|
|
|
@ -167,7 +167,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
volume.getCapacity(),
|
volume.getCapacity(),
|
||||||
volume.getDfsUsed(),
|
volume.getDfsUsed(),
|
||||||
volume.getAvailable(),
|
volume.getAvailable(),
|
||||||
volume.getBlockPoolUsed(bpid));
|
volume.getBlockPoolUsed(bpid),
|
||||||
|
volume.getNonDfsUsed());
|
||||||
reports.add(sr);
|
reports.add(sr);
|
||||||
} catch (ClosedChannelException e) {
|
} catch (ClosedChannelException e) {
|
||||||
continue;
|
continue;
|
||||||
|
|
|
@ -384,15 +384,46 @@ public class FsVolumeImpl implements FsVolumeSpi {
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public long getAvailable() throws IOException {
|
public long getAvailable() throws IOException {
|
||||||
long remaining = getCapacity() - getDfsUsed() - reservedForReplicas.get();
|
long remaining = getCapacity() - getDfsUsed() - getReservedForReplicas();
|
||||||
long available = usage.getAvailable() - reserved
|
long available = usage.getAvailable() - getRemainingReserved()
|
||||||
- reservedForReplicas.get();
|
- getReservedForReplicas();
|
||||||
if (remaining > available) {
|
if (remaining > available) {
|
||||||
remaining = available;
|
remaining = available;
|
||||||
}
|
}
|
||||||
return (remaining > 0) ? remaining : 0;
|
return (remaining > 0) ? remaining : 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
long getActualNonDfsUsed() throws IOException {
|
||||||
|
return usage.getUsed() - getDfsUsed();
|
||||||
|
}
|
||||||
|
|
||||||
|
private long getRemainingReserved() throws IOException {
|
||||||
|
long actualNonDfsUsed = getActualNonDfsUsed();
|
||||||
|
if (actualNonDfsUsed < reserved) {
|
||||||
|
return reserved - actualNonDfsUsed;
|
||||||
|
}
|
||||||
|
return 0L;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Unplanned Non-DFS usage, i.e. Extra usage beyond reserved.
|
||||||
|
*
|
||||||
|
* @return
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public long getNonDfsUsed() throws IOException {
|
||||||
|
long actualNonDfsUsed = getActualNonDfsUsed();
|
||||||
|
if (actualNonDfsUsed < reserved) {
|
||||||
|
return 0L;
|
||||||
|
}
|
||||||
|
return actualNonDfsUsed - reserved;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
long getDfAvailable() {
|
||||||
|
return usage.getAvailable();
|
||||||
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public long getReservedForReplicas() {
|
public long getReservedForReplicas() {
|
||||||
return reservedForReplicas.get();
|
return reservedForReplicas.get();
|
||||||
|
|
|
@ -646,4 +646,34 @@ public class TestPBHelper {
|
||||||
.build();
|
.build();
|
||||||
Assert.assertEquals(s, PBHelperClient.convert(PBHelperClient.convert(s)));
|
Assert.assertEquals(s, PBHelperClient.convert(PBHelperClient.convert(s)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void assertDnInfosEqual(DatanodeInfo[] dnInfos1,
|
||||||
|
DatanodeInfo[] dnInfos2) {
|
||||||
|
assertEquals(dnInfos1.length, dnInfos2.length);
|
||||||
|
for (int i = 0; i < dnInfos1.length; i++) {
|
||||||
|
compare(dnInfos1[i], dnInfos2[i]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDataNodeInfoPBHelper() {
|
||||||
|
DatanodeID id = DFSTestUtil.getLocalDatanodeID();
|
||||||
|
DatanodeInfo dnInfos0 = new DatanodeInfo(id);
|
||||||
|
dnInfos0.setCapacity(3500L);
|
||||||
|
dnInfos0.setDfsUsed(1000L);
|
||||||
|
dnInfos0.setNonDfsUsed(2000L);
|
||||||
|
dnInfos0.setRemaining(500L);
|
||||||
|
HdfsProtos.DatanodeInfoProto dnproto = PBHelperClient.convert(dnInfos0);
|
||||||
|
DatanodeInfo dnInfos1 = PBHelperClient.convert(dnproto);
|
||||||
|
compare(dnInfos0, dnInfos1);
|
||||||
|
assertEquals(dnInfos0.getNonDfsUsed(), dnInfos1.getNonDfsUsed());
|
||||||
|
|
||||||
|
//Testing without nonDfs field
|
||||||
|
HdfsProtos.DatanodeInfoProto.Builder b =
|
||||||
|
HdfsProtos.DatanodeInfoProto.newBuilder();
|
||||||
|
b.setId(PBHelperClient.convert(id)).setCapacity(3500L).setDfsUsed(1000L)
|
||||||
|
.setRemaining(500L);
|
||||||
|
DatanodeInfo dnInfos3 = PBHelperClient.convert(b.build());
|
||||||
|
assertEquals(dnInfos0.getNonDfsUsed(), dnInfos3.getNonDfsUsed());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -292,7 +292,7 @@ public class BlockManagerTestUtil {
|
||||||
StorageReport report = new StorageReport(
|
StorageReport report = new StorageReport(
|
||||||
dns ,false, storage.getCapacity(),
|
dns ,false, storage.getCapacity(),
|
||||||
storage.getDfsUsed(), storage.getRemaining(),
|
storage.getDfsUsed(), storage.getRemaining(),
|
||||||
storage.getBlockPoolUsed());
|
storage.getBlockPoolUsed(), 0);
|
||||||
reports.add(report);
|
reports.add(report);
|
||||||
}
|
}
|
||||||
return reports.toArray(StorageReport.EMPTY_ARRAY);
|
return reports.toArray(StorageReport.EMPTY_ARRAY);
|
||||||
|
|
|
@ -1132,7 +1132,7 @@ public class TestBlockManager {
|
||||||
StorageReport report = new StorageReport(
|
StorageReport report = new StorageReport(
|
||||||
dns, true, storageInfo.getCapacity(),
|
dns, true, storageInfo.getCapacity(),
|
||||||
storageInfo.getDfsUsed(), storageInfo.getRemaining(),
|
storageInfo.getDfsUsed(), storageInfo.getRemaining(),
|
||||||
storageInfo.getBlockPoolUsed());
|
storageInfo.getBlockPoolUsed(), 0L);
|
||||||
reports.add(report);
|
reports.add(report);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
|
@ -440,7 +440,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
|
||||||
synchronized StorageReport getStorageReport(String bpid) {
|
synchronized StorageReport getStorageReport(String bpid) {
|
||||||
return new StorageReport(dnStorage,
|
return new StorageReport(dnStorage,
|
||||||
false, getCapacity(), getUsed(), getFree(),
|
false, getCapacity(), getUsed(), getFree(),
|
||||||
map.get(bpid).getUsed());
|
map.get(bpid).getUsed(), 0L);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -73,7 +73,7 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
|
||||||
@Override
|
@Override
|
||||||
public StorageReport[] getStorageReports(String bpid) throws IOException {
|
public StorageReport[] getStorageReports(String bpid) throws IOException {
|
||||||
StorageReport[] result = new StorageReport[1];
|
StorageReport[] result = new StorageReport[1];
|
||||||
result[0] = new StorageReport(storage, false, 0, 0, 0, 0);
|
result[0] = new StorageReport(storage, false, 0, 0, 0, 0, 0);
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -30,6 +30,7 @@ import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -177,4 +178,43 @@ public class TestFsVolumeList {
|
||||||
conf, StorageType.DEFAULT);
|
conf, StorageType.DEFAULT);
|
||||||
assertEquals("", 100L, volume4.getReserved());
|
assertEquals("", 100L, volume4.getReserved());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNonDfsUsedMetricForVolume() throws Exception {
|
||||||
|
File volDir = new File(baseDir, "volume-0");
|
||||||
|
volDir.mkdirs();
|
||||||
|
/*
|
||||||
|
* Lets have the example.
|
||||||
|
* Capacity - 1000
|
||||||
|
* Reserved - 100
|
||||||
|
* DfsUsed - 200
|
||||||
|
* Actual Non-DfsUsed - 300 -->(expected)
|
||||||
|
* ReservedForReplicas - 50
|
||||||
|
*/
|
||||||
|
long diskCapacity = 1000L;
|
||||||
|
long duReserved = 100L;
|
||||||
|
long dfsUsage = 200L;
|
||||||
|
long actualNonDfsUsage = 300L;
|
||||||
|
long reservedForReplicas = 50L;
|
||||||
|
conf.setLong(DFSConfigKeys.DFS_DATANODE_DU_RESERVED_KEY, duReserved);
|
||||||
|
FsVolumeImpl volume = new FsVolumeImpl(dataset, "storage-id", volDir, conf,
|
||||||
|
StorageType.DEFAULT);
|
||||||
|
FsVolumeImpl spyVolume = Mockito.spy(volume);
|
||||||
|
// Set Capacity for testing
|
||||||
|
long testCapacity = diskCapacity - duReserved;
|
||||||
|
spyVolume.setCapacityForTesting(testCapacity);
|
||||||
|
// Mock volume.getDfAvailable()
|
||||||
|
long dfAvailable = diskCapacity - dfsUsage - actualNonDfsUsage;
|
||||||
|
Mockito.doReturn(dfAvailable).when(spyVolume).getDfAvailable();
|
||||||
|
// Mock dfsUsage
|
||||||
|
Mockito.doReturn(dfsUsage).when(spyVolume).getDfsUsed();
|
||||||
|
// Mock reservedForReplcas
|
||||||
|
Mockito.doReturn(reservedForReplicas).when(spyVolume)
|
||||||
|
.getReservedForReplicas();
|
||||||
|
Mockito.doReturn(actualNonDfsUsage).when(spyVolume)
|
||||||
|
.getActualNonDfsUsed();
|
||||||
|
long expectedNonDfsUsage =
|
||||||
|
actualNonDfsUsage - duReserved;
|
||||||
|
assertEquals(expectedNonDfsUsage, spyVolume.getNonDfsUsed());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -949,7 +949,7 @@ public class NNThroughputBenchmark implements Tool {
|
||||||
// register datanode
|
// register datanode
|
||||||
// TODO:FEDERATION currently a single block pool is supported
|
// TODO:FEDERATION currently a single block pool is supported
|
||||||
StorageReport[] rep = { new StorageReport(storage, false,
|
StorageReport[] rep = { new StorageReport(storage, false,
|
||||||
DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED) };
|
DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED, 0L) };
|
||||||
DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration, rep,
|
DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration, rep,
|
||||||
0L, 0L, 0, 0, 0, null, true).getCommands();
|
0L, 0L, 0, 0, 0, null, true).getCommands();
|
||||||
if(cmds != null) {
|
if(cmds != null) {
|
||||||
|
@ -998,7 +998,7 @@ public class NNThroughputBenchmark implements Tool {
|
||||||
int replicateBlocks() throws IOException {
|
int replicateBlocks() throws IOException {
|
||||||
// register datanode
|
// register datanode
|
||||||
StorageReport[] rep = { new StorageReport(storage,
|
StorageReport[] rep = { new StorageReport(storage,
|
||||||
false, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED) };
|
false, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED, 0) };
|
||||||
DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration,
|
DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration,
|
||||||
rep, 0L, 0L, 0, 0, 0, null, true).getCommands();
|
rep, 0L, 0L, 0, 0, 0, null, true).getCommands();
|
||||||
if (cmds != null) {
|
if (cmds != null) {
|
||||||
|
|
|
@ -128,7 +128,7 @@ public class TestDeadDatanode {
|
||||||
// that asks datanode to register again
|
// that asks datanode to register again
|
||||||
StorageReport[] rep = { new StorageReport(
|
StorageReport[] rep = { new StorageReport(
|
||||||
new DatanodeStorage(reg.getDatanodeUuid()),
|
new DatanodeStorage(reg.getDatanodeUuid()),
|
||||||
false, 0, 0, 0, 0) };
|
false, 0, 0, 0, 0, 0) };
|
||||||
DatanodeCommand[] cmd =
|
DatanodeCommand[] cmd =
|
||||||
dnp.sendHeartbeat(reg, rep, 0L, 0L, 0, 0, 0, null, true).getCommands();
|
dnp.sendHeartbeat(reg, rep, 0L, 0L, 0, 0, 0, null, true).getCommands();
|
||||||
assertEquals(1, cmd.length);
|
assertEquals(1, cmd.length);
|
||||||
|
|
|
@ -71,7 +71,6 @@ public class TestNamenodeCapacityReport {
|
||||||
try {
|
try {
|
||||||
cluster = new MiniDFSCluster.Builder(conf).build();
|
cluster = new MiniDFSCluster.Builder(conf).build();
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
|
|
||||||
final FSNamesystem namesystem = cluster.getNamesystem();
|
final FSNamesystem namesystem = cluster.getNamesystem();
|
||||||
final DatanodeManager dm = cluster.getNamesystem().getBlockManager(
|
final DatanodeManager dm = cluster.getNamesystem().getBlockManager(
|
||||||
).getDatanodeManager();
|
).getDatanodeManager();
|
||||||
|
@ -100,8 +99,9 @@ public class TestNamenodeCapacityReport {
|
||||||
+ " used " + used + " non DFS used " + nonDFSUsed
|
+ " used " + used + " non DFS used " + nonDFSUsed
|
||||||
+ " remaining " + remaining + " perentUsed " + percentUsed
|
+ " remaining " + remaining + " perentUsed " + percentUsed
|
||||||
+ " percentRemaining " + percentRemaining);
|
+ " percentRemaining " + percentRemaining);
|
||||||
|
// There will be 5% space reserved in ext filesystem which is not
|
||||||
assertTrue(configCapacity == (used + remaining + nonDFSUsed));
|
// considered.
|
||||||
|
assertTrue(configCapacity >= (used + remaining + nonDFSUsed));
|
||||||
assertTrue(percentUsed == DFSUtilClient.getPercentUsed(used,
|
assertTrue(percentUsed == DFSUtilClient.getPercentUsed(used,
|
||||||
configCapacity));
|
configCapacity));
|
||||||
assertTrue(percentRemaining == DFSUtilClient.getPercentRemaining(
|
assertTrue(percentRemaining == DFSUtilClient.getPercentRemaining(
|
||||||
|
@ -148,7 +148,9 @@ public class TestNamenodeCapacityReport {
|
||||||
assertTrue(configCapacity == diskCapacity - reserved);
|
assertTrue(configCapacity == diskCapacity - reserved);
|
||||||
|
|
||||||
// Ensure new total capacity reported excludes the reserved space
|
// Ensure new total capacity reported excludes the reserved space
|
||||||
assertTrue(configCapacity == (used + remaining + nonDFSUsed));
|
// There will be 5% space reserved in ext filesystem which is not
|
||||||
|
// considered.
|
||||||
|
assertTrue(configCapacity >= (used + remaining + nonDFSUsed));
|
||||||
|
|
||||||
// Ensure percent used is calculated based on used and present capacity
|
// Ensure percent used is calculated based on used and present capacity
|
||||||
assertTrue(percentUsed == DFSUtilClient.getPercentUsed(used,
|
assertTrue(percentUsed == DFSUtilClient.getPercentUsed(used,
|
||||||
|
@ -160,9 +162,33 @@ public class TestNamenodeCapacityReport {
|
||||||
|
|
||||||
// Ensure percent used is calculated based on used and present capacity
|
// Ensure percent used is calculated based on used and present capacity
|
||||||
assertTrue(percentRemaining == ((float)remaining * 100.0f)/(float)configCapacity);
|
assertTrue(percentRemaining == ((float)remaining * 100.0f)/(float)configCapacity);
|
||||||
|
|
||||||
|
//Adding testcase for non-dfs used where we need to consider
|
||||||
|
// reserved replica also.
|
||||||
|
final int fileCount = 5;
|
||||||
|
final DistributedFileSystem fs = cluster.getFileSystem();
|
||||||
|
// create streams and hsync to force datastreamers to start
|
||||||
|
DFSOutputStream[] streams = new DFSOutputStream[fileCount];
|
||||||
|
for (int i=0; i < fileCount; i++) {
|
||||||
|
streams[i] = (DFSOutputStream)fs.create(new Path("/f"+i))
|
||||||
|
.getWrappedStream();
|
||||||
|
streams[i].write("1".getBytes());
|
||||||
|
streams[i].hsync();
|
||||||
|
}
|
||||||
|
triggerHeartbeats(cluster.getDataNodes());
|
||||||
|
assertTrue(configCapacity > (namesystem.getCapacityUsed() + namesystem
|
||||||
|
.getCapacityRemaining() + namesystem.getNonDfsUsedSpace()));
|
||||||
|
// There is a chance that nonDFS usage might have slightly due to
|
||||||
|
// testlogs, So assume 1MB other files used within this gap
|
||||||
|
assertTrue(
|
||||||
|
(namesystem.getCapacityUsed() + namesystem.getCapacityRemaining()
|
||||||
|
+ namesystem.getNonDfsUsedSpace() + fileCount * fs
|
||||||
|
.getDefaultBlockSize()) - configCapacity < 1 * 1024);
|
||||||
}
|
}
|
||||||
finally {
|
finally {
|
||||||
if (cluster != null) {cluster.shutdown();}
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -172,7 +172,9 @@ public class TestNameNodeMetrics {
|
||||||
MetricsAsserts.getLongGauge("CapacityRemaining", rb);
|
MetricsAsserts.getLongGauge("CapacityRemaining", rb);
|
||||||
long capacityUsedNonDFS =
|
long capacityUsedNonDFS =
|
||||||
MetricsAsserts.getLongGauge("CapacityUsedNonDFS", rb);
|
MetricsAsserts.getLongGauge("CapacityUsedNonDFS", rb);
|
||||||
assert(capacityUsed + capacityRemaining + capacityUsedNonDFS ==
|
// There will be 5% space reserved in ext filesystem which is not
|
||||||
|
// considered.
|
||||||
|
assert (capacityUsed + capacityRemaining + capacityUsedNonDFS <=
|
||||||
capacityTotal);
|
capacityTotal);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue