HDFS-15548. Allow configuring DISK/ARCHIVE storage types on same device mount (#2288). Contributed by Leon Gao.
This commit is contained in:
parent
1ea3f74246
commit
9a9ab5b48e
|
@ -1961,7 +1961,7 @@ public class PBHelperClient {
|
|||
return new StorageReport(p.hasStorage() ? convert(p.getStorage())
|
||||
: new DatanodeStorage(p.getStorageUuid()), p.getFailed(),
|
||||
p.getCapacity(), p.getDfsUsed(), p.getRemaining(),
|
||||
p.getBlockPoolUsed(), nonDfsUsed);
|
||||
p.getBlockPoolUsed(), nonDfsUsed, p.getMount());
|
||||
}
|
||||
|
||||
public static DatanodeStorage convert(DatanodeStorageProto s) {
|
||||
|
@ -2696,7 +2696,8 @@ public class PBHelperClient {
|
|||
.setDfsUsed(r.getDfsUsed()).setRemaining(r.getRemaining())
|
||||
.setStorageUuid(r.getStorage().getStorageID())
|
||||
.setStorage(convert(r.getStorage()))
|
||||
.setNonDfsUsed(r.getNonDfsUsed());
|
||||
.setNonDfsUsed(r.getNonDfsUsed())
|
||||
.setMount(r.getMount());
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
|
|
|
@ -28,11 +28,19 @@ public class StorageReport {
|
|||
private final long nonDfsUsed;
|
||||
private final long remaining;
|
||||
private final long blockPoolUsed;
|
||||
private final String mount;
|
||||
|
||||
public static final StorageReport[] EMPTY_ARRAY = {};
|
||||
|
||||
public StorageReport(DatanodeStorage storage, boolean failed, long capacity,
|
||||
long dfsUsed, long remaining, long bpUsed, long nonDfsUsed) {
|
||||
this(storage, failed, capacity, dfsUsed,
|
||||
remaining, bpUsed, nonDfsUsed, "");
|
||||
}
|
||||
|
||||
public StorageReport(DatanodeStorage storage, boolean failed, long capacity,
|
||||
long dfsUsed, long remaining, long bpUsed,
|
||||
long nonDfsUsed, String mount) {
|
||||
this.storage = storage;
|
||||
this.failed = failed;
|
||||
this.capacity = capacity;
|
||||
|
@ -40,6 +48,7 @@ public class StorageReport {
|
|||
this.nonDfsUsed = nonDfsUsed;
|
||||
this.remaining = remaining;
|
||||
this.blockPoolUsed = bpUsed;
|
||||
this.mount = mount;
|
||||
}
|
||||
|
||||
public DatanodeStorage getStorage() {
|
||||
|
@ -69,4 +78,8 @@ public class StorageReport {
|
|||
public long getBlockPoolUsed() {
|
||||
return blockPoolUsed;
|
||||
}
|
||||
|
||||
public String getMount() {
|
||||
return mount;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -158,6 +158,7 @@ message StorageReportProto {
|
|||
optional uint64 blockPoolUsed = 6 [ default = 0 ];
|
||||
optional DatanodeStorageProto storage = 7; // supersedes StorageUuid
|
||||
optional uint64 nonDfsUsed = 8;
|
||||
optional string mount = 9;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -1516,6 +1516,26 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||
public static final boolean DFS_PROTECTED_SUBDIRECTORIES_ENABLE_DEFAULT =
|
||||
false;
|
||||
|
||||
/**
|
||||
* HDFS-15548 to allow DISK/ARCHIVE configured on the same disk mount.
|
||||
* The default ratio will be applied if DISK/ARCHIVE are configured
|
||||
* on same disk mount.
|
||||
*
|
||||
* Beware that capacity usage might be larger than 100% if there are already
|
||||
* data blocks exist and the configured ratio is small, which will
|
||||
* prevent the volume from taking new blocks until capacity is balanced out.
|
||||
*/
|
||||
public static final String DFS_DATANODE_ALLOW_SAME_DISK_TIERING =
|
||||
"dfs.datanode.same-disk-tiering.enabled";
|
||||
public static final boolean DFS_DATANODE_ALLOW_SAME_DISK_TIERING_DEFAULT =
|
||||
false;
|
||||
|
||||
public static final String
|
||||
DFS_DATANODE_RESERVE_FOR_ARCHIVE_DEFAULT_PERCENTAGE =
|
||||
"dfs.datanode.reserve-for-archive.default.percentage";
|
||||
public static final double
|
||||
DFS_DATANODE_RESERVE_FOR_ARCHIVE_DEFAULT_PERCENTAGE_DEFAULT = 0.0;
|
||||
|
||||
// dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry
|
||||
@Deprecated
|
||||
public static final String DFS_CLIENT_RETRY_POLICY_ENABLED_KEY
|
||||
|
|
|
@ -404,6 +404,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
|
|||
long totalBlockPoolUsed = 0;
|
||||
long totalDfsUsed = 0;
|
||||
long totalNonDfsUsed = 0;
|
||||
Set<String> visitedMount = new HashSet<>();
|
||||
Set<DatanodeStorageInfo> failedStorageInfos = null;
|
||||
|
||||
// Decide if we should check for any missing StorageReport and mark it as
|
||||
|
@ -472,7 +473,17 @@ public class DatanodeDescriptor extends DatanodeInfo {
|
|||
totalRemaining += report.getRemaining();
|
||||
totalBlockPoolUsed += report.getBlockPoolUsed();
|
||||
totalDfsUsed += report.getDfsUsed();
|
||||
totalNonDfsUsed += report.getNonDfsUsed();
|
||||
String mount = report.getMount();
|
||||
// For volumes on the same mount,
|
||||
// ignore duplicated volumes for nonDfsUsed.
|
||||
if (mount == null || mount.isEmpty()) {
|
||||
totalNonDfsUsed += report.getNonDfsUsed();
|
||||
} else {
|
||||
if (!visitedMount.contains(mount)) {
|
||||
totalNonDfsUsed += report.getNonDfsUsed();
|
||||
visitedMount.add(mount);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Update total metrics for the node.
|
||||
|
|
|
@ -171,7 +171,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
volume.getDfsUsed(),
|
||||
volume.getAvailable(),
|
||||
volume.getBlockPoolUsed(bpid),
|
||||
volume.getNonDfsUsed());
|
||||
volume.getNonDfsUsed(),
|
||||
volume.getMount()
|
||||
);
|
||||
reports.add(sr);
|
||||
} catch (ClosedChannelException e) {
|
||||
continue;
|
||||
|
@ -190,6 +192,10 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
}
|
||||
}
|
||||
|
||||
MountVolumeMap getMountVolumeMap() {
|
||||
return volumes.getMountVolumeMap();
|
||||
}
|
||||
|
||||
@Override // FsDatasetSpi
|
||||
public Block getStoredBlock(String bpid, long blkid)
|
||||
throws IOException {
|
||||
|
@ -365,7 +371,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
RoundRobinVolumeChoosingPolicy.class,
|
||||
VolumeChoosingPolicy.class), conf);
|
||||
volumes = new FsVolumeList(volumeFailureInfos, datanode.getBlockScanner(),
|
||||
blockChooserImpl);
|
||||
blockChooserImpl, conf);
|
||||
asyncDiskService = new FsDatasetAsyncDiskService(datanode, this);
|
||||
asyncLazyPersistService = new RamDiskAsyncLazyPersistService(datanode, conf);
|
||||
deletingBlock = new HashMap<String, Set<Long>>();
|
||||
|
@ -464,12 +470,27 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
LOG.error(errorMsg);
|
||||
throw new IOException(errorMsg);
|
||||
}
|
||||
// Check if there is same storage type on the mount.
|
||||
// Only useful when same disk tiering is turned on.
|
||||
FsVolumeImpl volumeImpl = (FsVolumeImpl) ref.getVolume();
|
||||
FsVolumeReference checkRef = volumes
|
||||
.getMountVolumeMap()
|
||||
.getVolumeRefByMountAndStorageType(
|
||||
volumeImpl.getMount(), volumeImpl.getStorageType());
|
||||
if (checkRef != null) {
|
||||
final String errorMsg = String.format(
|
||||
"Storage type %s already exists on same mount: %s.",
|
||||
volumeImpl.getStorageType(), volumeImpl.getMount());
|
||||
checkRef.close();
|
||||
LOG.error(errorMsg);
|
||||
throw new IOException(errorMsg);
|
||||
}
|
||||
volumeMap.mergeAll(replicaMap);
|
||||
storageMap.put(sd.getStorageUuid(),
|
||||
new DatanodeStorage(sd.getStorageUuid(),
|
||||
DatanodeStorage.State.NORMAL,
|
||||
storageType));
|
||||
asyncDiskService.addVolume((FsVolumeImpl) ref.getVolume());
|
||||
asyncDiskService.addVolume(volumeImpl);
|
||||
volumes.addVolume(ref);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -134,6 +134,9 @@ public class FsVolumeImpl implements FsVolumeSpi {
|
|||
private final FileIoProvider fileIoProvider;
|
||||
private final DataNodeVolumeMetrics metrics;
|
||||
private URI baseURI;
|
||||
private boolean enableSameDiskTiering;
|
||||
private final String mount;
|
||||
private double reservedForArchive;
|
||||
|
||||
/**
|
||||
* Per-volume worker pool that processes new blocks to cache.
|
||||
|
@ -190,6 +193,18 @@ public class FsVolumeImpl implements FsVolumeSpi {
|
|||
}
|
||||
this.conf = conf;
|
||||
this.fileIoProvider = fileIoProvider;
|
||||
this.enableSameDiskTiering =
|
||||
conf.getBoolean(DFSConfigKeys.DFS_DATANODE_ALLOW_SAME_DISK_TIERING,
|
||||
DFSConfigKeys.DFS_DATANODE_ALLOW_SAME_DISK_TIERING_DEFAULT);
|
||||
if (enableSameDiskTiering && usage != null) {
|
||||
this.mount = usage.getMount();
|
||||
} else {
|
||||
mount = "";
|
||||
}
|
||||
}
|
||||
|
||||
String getMount() {
|
||||
return mount;
|
||||
}
|
||||
|
||||
protected ThreadPoolExecutor initializeCacheExecutor(File parent) {
|
||||
|
@ -407,11 +422,15 @@ public class FsVolumeImpl implements FsVolumeSpi {
|
|||
* Return either the configured capacity of the file system if configured; or
|
||||
* the capacity of the file system excluding space reserved for non-HDFS.
|
||||
*
|
||||
* When same-disk-tiering is turned on, the reported capacity
|
||||
* will take reservedForArchive value into consideration of.
|
||||
*
|
||||
* @return the unreserved number of bytes left in this filesystem. May be
|
||||
* zero.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public long getCapacity() {
|
||||
long capacity;
|
||||
if (configuredCapacity < 0L) {
|
||||
long remaining;
|
||||
if (cachedCapacity > 0L) {
|
||||
|
@ -419,9 +438,18 @@ public class FsVolumeImpl implements FsVolumeSpi {
|
|||
} else {
|
||||
remaining = usage.getCapacity() - getReserved();
|
||||
}
|
||||
return Math.max(remaining, 0L);
|
||||
capacity = Math.max(remaining, 0L);
|
||||
} else {
|
||||
capacity = configuredCapacity;
|
||||
}
|
||||
return configuredCapacity;
|
||||
|
||||
if (enableSameDiskTiering && dataset.getMountVolumeMap() != null) {
|
||||
double capacityRatio = dataset.getMountVolumeMap()
|
||||
.getCapacityRatioByMountAndStorageType(mount, storageType);
|
||||
capacity = (long) (capacity * capacityRatio);
|
||||
}
|
||||
|
||||
return capacity;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -452,7 +480,34 @@ public class FsVolumeImpl implements FsVolumeSpi {
|
|||
}
|
||||
|
||||
long getActualNonDfsUsed() throws IOException {
|
||||
return usage.getUsed() - getDfsUsed();
|
||||
// DISK and ARCHIVAL on same disk
|
||||
// should share the same amount of reserved capacity.
|
||||
// When calculating actual non dfs used,
|
||||
// exclude DFS used capacity by another volume.
|
||||
if (enableSameDiskTiering &&
|
||||
(storageType == StorageType.DISK
|
||||
|| storageType == StorageType.ARCHIVE)) {
|
||||
StorageType counterpartStorageType = storageType == StorageType.DISK
|
||||
? StorageType.ARCHIVE : StorageType.DISK;
|
||||
FsVolumeReference counterpartRef = dataset
|
||||
.getMountVolumeMap()
|
||||
.getVolumeRefByMountAndStorageType(mount, counterpartStorageType);
|
||||
if (counterpartRef != null) {
|
||||
FsVolumeImpl counterpartVol = (FsVolumeImpl) counterpartRef.getVolume();
|
||||
long used = getDfUsed() - getDfsUsed() - counterpartVol.getDfsUsed();
|
||||
counterpartRef.close();
|
||||
return used;
|
||||
}
|
||||
}
|
||||
return getDfUsed() - getDfsUsed();
|
||||
}
|
||||
|
||||
/**
|
||||
* This function is only used for Mock.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public long getDfUsed() {
|
||||
return usage.getUsed();
|
||||
}
|
||||
|
||||
private long getRemainingReserved() throws IOException {
|
||||
|
|
|
@ -35,6 +35,7 @@ import java.util.concurrent.locks.Condition;
|
|||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.StorageType;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||
|
@ -62,9 +63,13 @@ class FsVolumeList {
|
|||
private final VolumeChoosingPolicy<FsVolumeImpl> blockChooser;
|
||||
private final BlockScanner blockScanner;
|
||||
|
||||
private final boolean enableSameDiskTiering;
|
||||
private final MountVolumeMap mountVolumeMap;
|
||||
|
||||
FsVolumeList(List<VolumeFailureInfo> initialVolumeFailureInfos,
|
||||
BlockScanner blockScanner,
|
||||
VolumeChoosingPolicy<FsVolumeImpl> blockChooser) {
|
||||
VolumeChoosingPolicy<FsVolumeImpl> blockChooser,
|
||||
Configuration config) {
|
||||
this.blockChooser = blockChooser;
|
||||
this.blockScanner = blockScanner;
|
||||
this.checkDirsLock = new AutoCloseableLock();
|
||||
|
@ -73,6 +78,14 @@ class FsVolumeList {
|
|||
volumeFailureInfos.put(volumeFailureInfo.getFailedStorageLocation(),
|
||||
volumeFailureInfo);
|
||||
}
|
||||
enableSameDiskTiering = config.getBoolean(
|
||||
DFSConfigKeys.DFS_DATANODE_ALLOW_SAME_DISK_TIERING,
|
||||
DFSConfigKeys.DFS_DATANODE_ALLOW_SAME_DISK_TIERING_DEFAULT);
|
||||
mountVolumeMap = new MountVolumeMap(config);
|
||||
}
|
||||
|
||||
MountVolumeMap getMountVolumeMap() {
|
||||
return mountVolumeMap;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -291,6 +304,9 @@ class FsVolumeList {
|
|||
void addVolume(FsVolumeReference ref) {
|
||||
FsVolumeImpl volume = (FsVolumeImpl) ref.getVolume();
|
||||
volumes.add(volume);
|
||||
if (isSameDiskTieringApplied(volume)) {
|
||||
mountVolumeMap.addVolume(volume);
|
||||
}
|
||||
if (blockScanner != null) {
|
||||
blockScanner.addVolumeScanner(ref);
|
||||
} else {
|
||||
|
@ -311,6 +327,9 @@ class FsVolumeList {
|
|||
*/
|
||||
private void removeVolume(FsVolumeImpl target) {
|
||||
if (volumes.remove(target)) {
|
||||
if (isSameDiskTieringApplied(target)) {
|
||||
mountVolumeMap.removeVolume(target);
|
||||
}
|
||||
if (blockScanner != null) {
|
||||
blockScanner.removeVolumeScanner(target);
|
||||
}
|
||||
|
@ -331,6 +350,15 @@ class FsVolumeList {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if same disk tiering is applied to the volume.
|
||||
*/
|
||||
private boolean isSameDiskTieringApplied(FsVolumeImpl target) {
|
||||
return enableSameDiskTiering &&
|
||||
(target.getStorageType() == StorageType.DISK
|
||||
|| target.getStorageType() == StorageType.ARCHIVE);
|
||||
}
|
||||
|
||||
/**
|
||||
* Dynamically remove volume in the list.
|
||||
* @param storageLocation {@link StorageLocation} of the volume to be removed.
|
||||
|
|
|
@ -0,0 +1,113 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.StorageType;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
|
||||
|
||||
import java.nio.channels.ClosedChannelException;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
/**
|
||||
* MountVolumeInfo is a wrapper of
|
||||
* detailed volume information for MountVolumeMap.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
class MountVolumeInfo {
|
||||
private final ConcurrentMap<StorageType, FsVolumeImpl>
|
||||
storageTypeVolumeMap;
|
||||
private double reservedForArchiveDefault;
|
||||
|
||||
MountVolumeInfo(Configuration conf) {
|
||||
storageTypeVolumeMap = new ConcurrentHashMap<>();
|
||||
reservedForArchiveDefault = conf.getDouble(
|
||||
DFSConfigKeys.DFS_DATANODE_RESERVE_FOR_ARCHIVE_DEFAULT_PERCENTAGE,
|
||||
DFSConfigKeys
|
||||
.DFS_DATANODE_RESERVE_FOR_ARCHIVE_DEFAULT_PERCENTAGE_DEFAULT);
|
||||
if (reservedForArchiveDefault > 1) {
|
||||
FsDatasetImpl.LOG.warn("Value of reserve-for-archival is > 100%." +
|
||||
" Setting it to 100%.");
|
||||
reservedForArchiveDefault = 1;
|
||||
}
|
||||
if (reservedForArchiveDefault < 0) {
|
||||
FsDatasetImpl.LOG.warn("Value of reserve-for-archival is < 0." +
|
||||
" Setting it to 0.0");
|
||||
reservedForArchiveDefault = 0;
|
||||
}
|
||||
}
|
||||
|
||||
FsVolumeReference getVolumeRef(StorageType storageType) {
|
||||
try {
|
||||
FsVolumeImpl volumeImpl = storageTypeVolumeMap
|
||||
.getOrDefault(storageType, null);
|
||||
if (volumeImpl != null) {
|
||||
return volumeImpl.obtainReference();
|
||||
}
|
||||
} catch (ClosedChannelException e) {
|
||||
FsDatasetImpl.LOG.warn("Volume closed when getting volume" +
|
||||
" by storage type: " + storageType);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return configured capacity ratio.
|
||||
* If the volume is the only one on the mount,
|
||||
* return 1 to avoid unnecessary allocation.
|
||||
*
|
||||
* TODO: We should support customized capacity ratio for volumes.
|
||||
*/
|
||||
double getCapacityRatio(StorageType storageType) {
|
||||
if (storageTypeVolumeMap.containsKey(storageType)
|
||||
&& storageTypeVolumeMap.size() > 1) {
|
||||
if (storageType == StorageType.ARCHIVE) {
|
||||
return reservedForArchiveDefault;
|
||||
} else if (storageType == StorageType.DISK) {
|
||||
return 1 - reservedForArchiveDefault;
|
||||
}
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a volume to the mapping.
|
||||
* If there is already storage type exists on same mount, skip this volume.
|
||||
*/
|
||||
boolean addVolume(FsVolumeImpl volume) {
|
||||
if (storageTypeVolumeMap.containsKey(volume.getStorageType())) {
|
||||
FsDatasetImpl.LOG.error("Found storage type already exist." +
|
||||
" Skipping for now. Please check disk configuration");
|
||||
return false;
|
||||
}
|
||||
storageTypeVolumeMap.put(volume.getStorageType(), volume);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
void removeVolume(FsVolumeImpl target) {
|
||||
storageTypeVolumeMap.remove(target.getStorageType());
|
||||
}
|
||||
|
||||
int size() {
|
||||
return storageTypeVolumeMap.size();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,92 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.StorageType;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
|
||||
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
/**
|
||||
* MountVolumeMap contains information of the relationship
|
||||
* between underlying filesystem mount and datanode volumes.
|
||||
*
|
||||
* This is useful when configuring block tiering on same disk mount
|
||||
* (HDFS-15548). For now,
|
||||
* we don't configure multiple volumes with same storage type on one mount.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
class MountVolumeMap {
|
||||
private final ConcurrentMap<String, MountVolumeInfo>
|
||||
mountVolumeMapping;
|
||||
private final Configuration conf;
|
||||
|
||||
MountVolumeMap(Configuration conf) {
|
||||
mountVolumeMapping = new ConcurrentHashMap<>();
|
||||
this.conf = conf;
|
||||
}
|
||||
|
||||
FsVolumeReference getVolumeRefByMountAndStorageType(String mount,
|
||||
StorageType storageType) {
|
||||
if (mountVolumeMapping.containsKey(mount)) {
|
||||
return mountVolumeMapping
|
||||
.get(mount).getVolumeRef(storageType);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return capacity ratio.
|
||||
* If not exists, return 1 to use full capacity.
|
||||
*/
|
||||
double getCapacityRatioByMountAndStorageType(String mount,
|
||||
StorageType storageType) {
|
||||
if (mountVolumeMapping.containsKey(mount)) {
|
||||
return mountVolumeMapping.get(mount).getCapacityRatio(storageType);
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
|
||||
void addVolume(FsVolumeImpl volume) {
|
||||
String mount = volume.getMount();
|
||||
if (!mount.isEmpty()) {
|
||||
MountVolumeInfo info;
|
||||
if (mountVolumeMapping.containsKey(mount)) {
|
||||
info = mountVolumeMapping.get(mount);
|
||||
} else {
|
||||
info = new MountVolumeInfo(conf);
|
||||
mountVolumeMapping.put(mount, info);
|
||||
}
|
||||
info.addVolume(volume);
|
||||
}
|
||||
}
|
||||
|
||||
void removeVolume(FsVolumeImpl target) {
|
||||
String mount = target.getMount();
|
||||
if (!mount.isEmpty()) {
|
||||
MountVolumeInfo info = mountVolumeMapping.get(mount);
|
||||
info.removeVolume(target);
|
||||
if (info.size() == 0) {
|
||||
mountVolumeMapping.remove(mount);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -6012,4 +6012,29 @@
|
|||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.datanode.same-disk-tiering.enabled</name>
|
||||
<value>false</value>
|
||||
<description>
|
||||
HDFS-15548 to allow DISK/ARCHIVE to be
|
||||
configured on the same disk mount to manage disk IO.
|
||||
When this is enabled, datanode will control the capacity
|
||||
of DISK/ARCHIVE based on reserve-for-archive.percentage.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.datanode.reserve-for-archive.default.percentage</name>
|
||||
<value>0.0</value>
|
||||
<description>
|
||||
Default disk capacity ratio of ARCHIVE volume,
|
||||
expected the value to be between 0 to 1.
|
||||
This will be applied when DISK/ARCHIVE volumes are configured
|
||||
on the same mount, which is detected by datanode.
|
||||
Beware that capacity usage might be >100% if there are already
|
||||
data blocks exist and the configured ratio is small, which will
|
||||
prevent the volume from taking new blocks
|
||||
until capacity is balanced out.
|
||||
</description>
|
||||
</property>
|
||||
</configuration>
|
||||
|
|
|
@ -166,6 +166,26 @@ public class TestFsDatasetImpl {
|
|||
when(storage.getNumStorageDirs()).thenReturn(numDirs);
|
||||
}
|
||||
|
||||
private static StorageLocation createStorageWithStorageType(String subDir,
|
||||
StorageType storageType, Configuration conf, DataStorage storage,
|
||||
DataNode dataNode) throws IOException {
|
||||
String archiveStorageType = "[" + storageType + "]";
|
||||
String path = BASE_DIR + subDir;
|
||||
new File(path).mkdirs();
|
||||
String pathUri = new Path(path).toUri().toString();
|
||||
StorageLocation loc = StorageLocation.parse(archiveStorageType + pathUri);
|
||||
Storage.StorageDirectory sd = new Storage.StorageDirectory(
|
||||
loc);
|
||||
DataStorage.createStorageID(sd, false, conf);
|
||||
|
||||
DataStorage.VolumeBuilder builder =
|
||||
new DataStorage.VolumeBuilder(storage, sd);
|
||||
when(storage.prepareVolume(eq(dataNode), eq(loc),
|
||||
anyList()))
|
||||
.thenReturn(builder);
|
||||
return loc;
|
||||
}
|
||||
|
||||
private int getNumVolumes() {
|
||||
try (FsDatasetSpi.FsVolumeReferences volumes =
|
||||
dataset.getFsVolumeReferences()) {
|
||||
|
@ -358,6 +378,57 @@ public class TestFsDatasetImpl {
|
|||
assertTrue(actualVolumes.containsAll(expectedVolumes));
|
||||
}
|
||||
|
||||
// When turning on same disk tiering,
|
||||
// we should prevent misconfig that
|
||||
// volumes with same storage type created on same mount.
|
||||
@Test
|
||||
public void testAddVolumeWithSameDiskTiering() throws IOException {
|
||||
datanode = mock(DataNode.class);
|
||||
storage = mock(DataStorage.class);
|
||||
this.conf = new Configuration();
|
||||
this.conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 0);
|
||||
this.conf.set(DFSConfigKeys.DFS_DATANODE_REPLICA_CACHE_ROOT_DIR_KEY,
|
||||
replicaCacheRootDir);
|
||||
conf.setBoolean(DFSConfigKeys.DFS_DATANODE_ALLOW_SAME_DISK_TIERING,
|
||||
true);
|
||||
conf.setDouble(DFSConfigKeys
|
||||
.DFS_DATANODE_RESERVE_FOR_ARCHIVE_DEFAULT_PERCENTAGE,
|
||||
0.5);
|
||||
|
||||
when(datanode.getConf()).thenReturn(conf);
|
||||
final DNConf dnConf = new DNConf(datanode);
|
||||
when(datanode.getDnConf()).thenReturn(dnConf);
|
||||
final BlockScanner disabledBlockScanner = new BlockScanner(datanode);
|
||||
when(datanode.getBlockScanner()).thenReturn(disabledBlockScanner);
|
||||
final ShortCircuitRegistry shortCircuitRegistry =
|
||||
new ShortCircuitRegistry(conf);
|
||||
when(datanode.getShortCircuitRegistry()).thenReturn(shortCircuitRegistry);
|
||||
|
||||
createStorageDirs(storage, conf, 1);
|
||||
dataset = new FsDatasetImpl(datanode, storage, conf);
|
||||
|
||||
List<NamespaceInfo> nsInfos = Lists.newArrayList();
|
||||
for (String bpid : BLOCK_POOL_IDS) {
|
||||
nsInfos.add(new NamespaceInfo(0, CLUSTER_ID, bpid, 1));
|
||||
}
|
||||
dataset.addVolume(
|
||||
createStorageWithStorageType("archive1",
|
||||
StorageType.ARCHIVE, conf, storage, datanode), nsInfos);
|
||||
assertEquals(2, dataset.getVolumeCount());
|
||||
|
||||
// Add second ARCHIVAL volume should fail fsDataSetImpl.
|
||||
try {
|
||||
dataset.addVolume(
|
||||
createStorageWithStorageType("archive2",
|
||||
StorageType.ARCHIVE, conf, storage, datanode), nsInfos);
|
||||
fail("Should throw exception for" +
|
||||
" same storage type already exists on same mount.");
|
||||
} catch (IOException e) {
|
||||
assertTrue(e.getMessage()
|
||||
.startsWith("Storage type ARCHIVE already exists on same mount:"));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAddVolumeWithSameStorageUuid() throws IOException {
|
||||
HdfsConfiguration config = new HdfsConfiguration();
|
||||
|
|
|
@ -88,7 +88,8 @@ public class TestFsVolumeList {
|
|||
@Test(timeout=30000)
|
||||
public void testGetNextVolumeWithClosedVolume() throws IOException {
|
||||
FsVolumeList volumeList = new FsVolumeList(
|
||||
Collections.<VolumeFailureInfo>emptyList(), blockScanner, blockChooser);
|
||||
Collections.<VolumeFailureInfo>emptyList(),
|
||||
blockScanner, blockChooser, conf);
|
||||
final List<FsVolumeImpl> volumes = new ArrayList<>();
|
||||
for (int i = 0; i < 3; i++) {
|
||||
File curDir = new File(baseDir, "nextvolume-" + i);
|
||||
|
@ -131,7 +132,7 @@ public class TestFsVolumeList {
|
|||
@Test(timeout=30000)
|
||||
public void testReleaseVolumeRefIfNoBlockScanner() throws IOException {
|
||||
FsVolumeList volumeList = new FsVolumeList(
|
||||
Collections.<VolumeFailureInfo>emptyList(), null, blockChooser);
|
||||
Collections.<VolumeFailureInfo>emptyList(), null, blockChooser, conf);
|
||||
File volDir = new File(baseDir, "volume-0");
|
||||
volDir.mkdirs();
|
||||
FsVolumeImpl volume = new FsVolumeImplBuilder()
|
||||
|
@ -478,4 +479,145 @@ public class TestFsVolumeList {
|
|||
conf.setBoolean(DFSConfigKeys.DFS_DATANODE_FIXED_VOLUME_SIZE_KEY,
|
||||
DFSConfigKeys.DFS_DATANODE_FIXED_VOLUME_SIZE_DEFAULT);
|
||||
}
|
||||
|
||||
// Test basics with same disk archival turned on.
|
||||
@Test
|
||||
public void testGetVolumeWithSameDiskArchival() throws Exception {
|
||||
File diskVolDir = new File(baseDir, "volume-disk");
|
||||
File archivalVolDir = new File(baseDir, "volume-archival");
|
||||
diskVolDir.mkdirs();
|
||||
archivalVolDir.mkdirs();
|
||||
double reservedForArchival = 0.75;
|
||||
conf.setBoolean(DFSConfigKeys.DFS_DATANODE_ALLOW_SAME_DISK_TIERING,
|
||||
true);
|
||||
conf.setDouble(DFSConfigKeys
|
||||
.DFS_DATANODE_RESERVE_FOR_ARCHIVE_DEFAULT_PERCENTAGE,
|
||||
reservedForArchival);
|
||||
FsVolumeImpl diskVolume = new FsVolumeImplBuilder()
|
||||
.setConf(conf)
|
||||
.setDataset(dataset)
|
||||
.setStorageID("storage-id")
|
||||
.setStorageDirectory(
|
||||
new StorageDirectory(
|
||||
StorageLocation.parse(diskVolDir.getPath())))
|
||||
.build();
|
||||
FsVolumeImpl archivalVolume = new FsVolumeImplBuilder()
|
||||
.setConf(conf)
|
||||
.setDataset(dataset)
|
||||
.setStorageID("storage-id")
|
||||
.setStorageDirectory(
|
||||
new StorageDirectory(StorageLocation
|
||||
.parse("[ARCHIVE]" + archivalVolDir.getPath())))
|
||||
.build();
|
||||
FsVolumeList volumeList = new FsVolumeList(
|
||||
Collections.<VolumeFailureInfo>emptyList(),
|
||||
blockScanner, blockChooser, conf);
|
||||
volumeList.addVolume(archivalVolume.obtainReference());
|
||||
volumeList.addVolume(diskVolume.obtainReference());
|
||||
|
||||
assertEquals(diskVolume.getMount(), archivalVolume.getMount());
|
||||
String device = diskVolume.getMount();
|
||||
|
||||
// 1) getVolumeRef should return correct reference.
|
||||
assertEquals(diskVolume,
|
||||
volumeList.getMountVolumeMap()
|
||||
.getVolumeRefByMountAndStorageType(
|
||||
device, StorageType.DISK).getVolume());
|
||||
assertEquals(archivalVolume,
|
||||
volumeList.getMountVolumeMap()
|
||||
.getVolumeRefByMountAndStorageType(
|
||||
device, StorageType.ARCHIVE).getVolume());
|
||||
|
||||
// 2) removeVolume should work as expected
|
||||
volumeList.removeVolume(diskVolume.getStorageLocation(), true);
|
||||
assertNull(volumeList.getMountVolumeMap()
|
||||
.getVolumeRefByMountAndStorageType(
|
||||
device, StorageType.DISK));
|
||||
assertEquals(archivalVolume, volumeList.getMountVolumeMap()
|
||||
.getVolumeRefByMountAndStorageType(
|
||||
device, StorageType.ARCHIVE).getVolume());
|
||||
}
|
||||
|
||||
// Test dfs stats with same disk archival
|
||||
@Test
|
||||
public void testDfsUsageStatWithSameDiskArchival() throws Exception {
|
||||
File diskVolDir = new File(baseDir, "volume-disk");
|
||||
File archivalVolDir = new File(baseDir, "volume-archival");
|
||||
diskVolDir.mkdirs();
|
||||
archivalVolDir.mkdirs();
|
||||
|
||||
long dfCapacity = 1100L;
|
||||
double reservedForArchival = 0.75;
|
||||
// Disk and Archive shares same du Reserved.
|
||||
long duReserved = 100L;
|
||||
long diskDfsUsage = 100L;
|
||||
long archivalDfsUsage = 200L;
|
||||
long dfUsage = 700L;
|
||||
long dfAvailable = 300L;
|
||||
|
||||
// Set up DISK and ARCHIVAL and capacity.
|
||||
conf.setLong(DFSConfigKeys.DFS_DATANODE_DU_RESERVED_KEY, duReserved);
|
||||
conf.setBoolean(DFSConfigKeys.DFS_DATANODE_ALLOW_SAME_DISK_TIERING,
|
||||
true);
|
||||
conf.setDouble(DFSConfigKeys
|
||||
.DFS_DATANODE_RESERVE_FOR_ARCHIVE_DEFAULT_PERCENTAGE,
|
||||
reservedForArchival);
|
||||
FsVolumeImpl diskVolume = new FsVolumeImplBuilder()
|
||||
.setConf(conf)
|
||||
.setDataset(dataset)
|
||||
.setStorageID("storage-id")
|
||||
.setStorageDirectory(
|
||||
new StorageDirectory(StorageLocation.parse(diskVolDir.getPath())))
|
||||
.build();
|
||||
FsVolumeImpl archivalVolume = new FsVolumeImplBuilder()
|
||||
.setConf(conf)
|
||||
.setDataset(dataset)
|
||||
.setStorageID("storage-id")
|
||||
.setStorageDirectory(
|
||||
new StorageDirectory(
|
||||
StorageLocation.parse("[ARCHIVE]" + archivalVolDir.getPath())))
|
||||
.build();
|
||||
FsVolumeImpl spyDiskVolume = Mockito.spy(diskVolume);
|
||||
FsVolumeImpl spyArchivalVolume = Mockito.spy(archivalVolume);
|
||||
long testDfCapacity = dfCapacity - duReserved;
|
||||
spyDiskVolume.setCapacityForTesting(testDfCapacity);
|
||||
spyArchivalVolume.setCapacityForTesting(testDfCapacity);
|
||||
Mockito.doReturn(dfAvailable).when(spyDiskVolume).getDfAvailable();
|
||||
Mockito.doReturn(dfAvailable).when(spyArchivalVolume).getDfAvailable();
|
||||
|
||||
MountVolumeMap mountVolumeMap = new MountVolumeMap(conf);
|
||||
mountVolumeMap.addVolume(spyDiskVolume);
|
||||
mountVolumeMap.addVolume(spyArchivalVolume);
|
||||
Mockito.doReturn(mountVolumeMap).when(dataset).getMountVolumeMap();
|
||||
|
||||
// 1) getCapacity() should reflect configured archive storage percentage.
|
||||
long diskStorageTypeCapacity =
|
||||
(long) ((dfCapacity - duReserved) * (1 - reservedForArchival));
|
||||
assertEquals(diskStorageTypeCapacity, spyDiskVolume.getCapacity());
|
||||
long archiveStorageTypeCapacity =
|
||||
(long) ((dfCapacity - duReserved) * (reservedForArchival));
|
||||
assertEquals(archiveStorageTypeCapacity, spyArchivalVolume.getCapacity());
|
||||
|
||||
// 2) getActualNonDfsUsed() should count in both DISK and ARCHIVE.
|
||||
// expectedActualNonDfsUsage =
|
||||
// diskUsage - archivalDfsUsage - diskDfsUsage
|
||||
long expectedActualNonDfsUsage = 400L;
|
||||
Mockito.doReturn(diskDfsUsage)
|
||||
.when(spyDiskVolume).getDfsUsed();
|
||||
Mockito.doReturn(archivalDfsUsage)
|
||||
.when(spyArchivalVolume).getDfsUsed();
|
||||
Mockito.doReturn(dfUsage)
|
||||
.when(spyDiskVolume).getDfUsed();
|
||||
Mockito.doReturn(dfUsage)
|
||||
.when(spyArchivalVolume).getDfUsed();
|
||||
assertEquals(expectedActualNonDfsUsage,
|
||||
spyDiskVolume.getActualNonDfsUsed());
|
||||
assertEquals(expectedActualNonDfsUsage,
|
||||
spyArchivalVolume.getActualNonDfsUsed());
|
||||
|
||||
// 3) When there is only one volume on a disk mount,
|
||||
// we allocate the full disk capacity regardless of the default ratio.
|
||||
mountVolumeMap.removeVolume(spyArchivalVolume);
|
||||
assertEquals(dfCapacity - duReserved, spyDiskVolume.getCapacity());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,6 +25,7 @@ import java.io.IOException;
|
|||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.fs.StorageType;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -185,6 +186,58 @@ public class TestNamenodeCapacityReport {
|
|||
(namesystem.getCapacityUsed() + namesystem.getCapacityRemaining()
|
||||
+ namesystem.getNonDfsUsedSpace() + fileCount * fs
|
||||
.getDefaultBlockSize()) - configCapacity < 1 * 1024);
|
||||
} finally {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* We split the disk to DISK/ARCHIVE volumes and test if NN gets correct stat.
|
||||
*/
|
||||
@Test
|
||||
public void testVolumeSizeWithSameDiskTiering() throws Exception {
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
MiniDFSCluster cluster = null;
|
||||
|
||||
// Set aside fifth of the total capacity as reserved
|
||||
long reserved = 10000;
|
||||
conf.setLong(DFSConfigKeys.DFS_DATANODE_DU_RESERVED_KEY, reserved);
|
||||
|
||||
try {
|
||||
double reserveForAchive = 0.3;
|
||||
conf.setBoolean(DFSConfigKeys.DFS_DATANODE_ALLOW_SAME_DISK_TIERING,
|
||||
true);
|
||||
conf.setDouble(DFSConfigKeys
|
||||
.DFS_DATANODE_RESERVE_FOR_ARCHIVE_DEFAULT_PERCENTAGE,
|
||||
reserveForAchive);
|
||||
cluster = new MiniDFSCluster.Builder(conf).storageTypes(
|
||||
new StorageType[]{StorageType.DISK, StorageType.ARCHIVE}).build();
|
||||
cluster.waitActive();
|
||||
|
||||
final FsDatasetTestUtils utils = cluster.getFsDatasetTestUtils(0);
|
||||
|
||||
long configCapacity = cluster.getNamesystem().getCapacityTotal();
|
||||
|
||||
// Disk capacity should be just the raw capacity
|
||||
// as two volumes shares the capacity.
|
||||
long rawCapacity = utils.getRawCapacity();
|
||||
long diskCapacity = (long) ((rawCapacity - reserved) * reserveForAchive)
|
||||
+ (long) ((rawCapacity - reserved) * (1 - reserveForAchive))
|
||||
+ reserved;
|
||||
|
||||
// Ensure reserved should not be double counted.
|
||||
assertEquals(configCapacity, diskCapacity - reserved);
|
||||
|
||||
DataNode dn = cluster.getDataNodes().get(0);
|
||||
// Ensure nonDfsUsed is not double counted.
|
||||
long singleVolumeUsed = dn.getFSDataset()
|
||||
.getStorageReports(cluster.getNamesystem().getBlockPoolId())[0]
|
||||
.getNonDfsUsed();
|
||||
cluster.triggerHeartbeats();
|
||||
assertTrue(cluster.getNamesystem().getCapacityUsed()
|
||||
< singleVolumeUsed * 2);
|
||||
}
|
||||
finally {
|
||||
if (cluster != null) {
|
||||
|
|
Loading…
Reference in New Issue