HDFS-12712. [9806] Code style cleanup
This commit is contained in:
parent
80c3fec3a1
commit
8239e3afb3
|
@ -47,7 +47,6 @@ public final class HdfsConstants {
|
||||||
public static final String WARM_STORAGE_POLICY_NAME = "WARM";
|
public static final String WARM_STORAGE_POLICY_NAME = "WARM";
|
||||||
public static final byte COLD_STORAGE_POLICY_ID = 2;
|
public static final byte COLD_STORAGE_POLICY_ID = 2;
|
||||||
public static final String COLD_STORAGE_POLICY_NAME = "COLD";
|
public static final String COLD_STORAGE_POLICY_NAME = "COLD";
|
||||||
// branch HDFS-9806 XXX temporary until HDFS-7076
|
|
||||||
public static final byte PROVIDED_STORAGE_POLICY_ID = 1;
|
public static final byte PROVIDED_STORAGE_POLICY_ID = 1;
|
||||||
public static final String PROVIDED_STORAGE_POLICY_NAME = "PROVIDED";
|
public static final String PROVIDED_STORAGE_POLICY_NAME = "PROVIDED";
|
||||||
|
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.protocol;
|
package org.apache.hadoop.hdfs.protocol;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -40,6 +41,32 @@ import com.google.common.collect.Lists;
|
||||||
@InterfaceStability.Evolving
|
@InterfaceStability.Evolving
|
||||||
public class LocatedBlock {
|
public class LocatedBlock {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Comparator that ensures that a PROVIDED storage type is greater than any
|
||||||
|
* other storage type. Any other storage types are considered equal.
|
||||||
|
*/
|
||||||
|
private static class ProvidedLastComparator
|
||||||
|
implements Comparator<DatanodeInfoWithStorage>, Serializable {
|
||||||
|
|
||||||
|
private static final long serialVersionUID = 6441720011443190984L;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int compare(DatanodeInfoWithStorage dns1,
|
||||||
|
DatanodeInfoWithStorage dns2) {
|
||||||
|
if (StorageType.PROVIDED.equals(dns1.getStorageType())
|
||||||
|
&& !StorageType.PROVIDED.equals(dns2.getStorageType())) {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
if (!StorageType.PROVIDED.equals(dns1.getStorageType())
|
||||||
|
&& StorageType.PROVIDED.equals(dns2.getStorageType())) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
// Storage types of dns1 and dns2 are now both provided or not provided;
|
||||||
|
// thus, are essentially equal for the purpose of this comparator.
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private final ExtendedBlock b;
|
private final ExtendedBlock b;
|
||||||
private long offset; // offset of the first byte of the block in the file
|
private long offset; // offset of the first byte of the block in the file
|
||||||
private final DatanodeInfoWithStorage[] locs;
|
private final DatanodeInfoWithStorage[] locs;
|
||||||
|
@ -52,6 +79,10 @@ public class LocatedBlock {
|
||||||
// their locations are not part of this object
|
// their locations are not part of this object
|
||||||
private boolean corrupt;
|
private boolean corrupt;
|
||||||
private Token<BlockTokenIdentifier> blockToken = new Token<>();
|
private Token<BlockTokenIdentifier> blockToken = new Token<>();
|
||||||
|
|
||||||
|
// use one instance of the Provided comparator as it uses no state.
|
||||||
|
private static ProvidedLastComparator providedLastComparator =
|
||||||
|
new ProvidedLastComparator();
|
||||||
/**
|
/**
|
||||||
* List of cached datanode locations
|
* List of cached datanode locations
|
||||||
*/
|
*/
|
||||||
|
@ -156,29 +187,6 @@ public class LocatedBlock {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Comparator that ensures that a PROVIDED storage type is greater than
|
|
||||||
* any other storage type. Any other storage types are considered equal.
|
|
||||||
*/
|
|
||||||
private class ProvidedLastComparator
|
|
||||||
implements Comparator<DatanodeInfoWithStorage> {
|
|
||||||
@Override
|
|
||||||
public int compare(DatanodeInfoWithStorage dns1,
|
|
||||||
DatanodeInfoWithStorage dns2) {
|
|
||||||
if (StorageType.PROVIDED.equals(dns1.getStorageType())
|
|
||||||
&& !StorageType.PROVIDED.equals(dns2.getStorageType())) {
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
if (!StorageType.PROVIDED.equals(dns1.getStorageType())
|
|
||||||
&& StorageType.PROVIDED.equals(dns2.getStorageType())) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
// Storage types of dns1 and dns2 are now both provided or not provided;
|
|
||||||
// thus, are essentially equal for the purpose of this comparator.
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Moves all locations that have {@link StorageType}
|
* Moves all locations that have {@link StorageType}
|
||||||
* {@code PROVIDED} to the end of the locations array without
|
* {@code PROVIDED} to the end of the locations array without
|
||||||
|
@ -196,9 +204,8 @@ public class LocatedBlock {
|
||||||
}
|
}
|
||||||
// as this is a stable sort, for elements that are equal,
|
// as this is a stable sort, for elements that are equal,
|
||||||
// the current order of the elements is maintained
|
// the current order of the elements is maintained
|
||||||
Arrays.sort(locs, 0,
|
Arrays.sort(locs, 0, (activeLen < locs.length) ? activeLen : locs.length,
|
||||||
(activeLen < locs.length) ? activeLen : locs.length,
|
providedLastComparator);
|
||||||
new ProvidedLastComparator());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public long getStartOffset() {
|
public long getStartOffset() {
|
||||||
|
|
|
@ -192,7 +192,7 @@ public abstract class BlockInfo extends Block
|
||||||
DatanodeStorageInfo cur = getStorageInfo(idx);
|
DatanodeStorageInfo cur = getStorageInfo(idx);
|
||||||
if(cur != null) {
|
if(cur != null) {
|
||||||
if (cur.getStorageType() == StorageType.PROVIDED) {
|
if (cur.getStorageType() == StorageType.PROVIDED) {
|
||||||
//if block resides on provided storage, only match the storage ids
|
// if block resides on provided storage, only match the storage ids
|
||||||
if (dn.getStorageInfo(cur.getStorageID()) != null) {
|
if (dn.getStorageInfo(cur.getStorageID()) != null) {
|
||||||
// do not return here as we have to check the other
|
// do not return here as we have to check the other
|
||||||
// DatanodeStorageInfos for this block which could be local
|
// DatanodeStorageInfos for this block which could be local
|
||||||
|
|
|
@ -1240,7 +1240,6 @@ public class BlockManager implements BlockStatsMXBean {
|
||||||
final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations();
|
final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations();
|
||||||
final ExtendedBlock eb = new ExtendedBlock(getBlockPoolId(),
|
final ExtendedBlock eb = new ExtendedBlock(getBlockPoolId(),
|
||||||
blk);
|
blk);
|
||||||
//TODO use locatedBlocks builder??
|
|
||||||
return newLocatedStripedBlock(eb, storages, uc.getBlockIndices(), pos,
|
return newLocatedStripedBlock(eb, storages, uc.getBlockIndices(), pos,
|
||||||
false);
|
false);
|
||||||
} else {
|
} else {
|
||||||
|
@ -2497,8 +2496,8 @@ public class BlockManager implements BlockStatsMXBean {
|
||||||
|
|
||||||
// To minimize startup time, we discard any second (or later) block reports
|
// To minimize startup time, we discard any second (or later) block reports
|
||||||
// that we receive while still in startup phase.
|
// that we receive while still in startup phase.
|
||||||
// !#! Register DN with provided storage, not with storage owned by DN
|
// Register DN with provided storage, not with storage owned by DN
|
||||||
// !#! DN should still have a ref to the DNStorageInfo
|
// DN should still have a ref to the DNStorageInfo.
|
||||||
DatanodeStorageInfo storageInfo =
|
DatanodeStorageInfo storageInfo =
|
||||||
providedStorageMap.getStorage(node, storage);
|
providedStorageMap.getStorage(node, storage);
|
||||||
|
|
||||||
|
|
|
@ -532,7 +532,7 @@ public class DatanodeManager {
|
||||||
} else {
|
} else {
|
||||||
networktopology.sortByDistance(client, lb.getLocations(), activeLen);
|
networktopology.sortByDistance(client, lb.getLocations(), activeLen);
|
||||||
}
|
}
|
||||||
//move PROVIDED storage to the end to prefer local replicas.
|
// move PROVIDED storage to the end to prefer local replicas.
|
||||||
lb.moveProvidedToEnd(activeLen);
|
lb.moveProvidedToEnd(activeLen);
|
||||||
// must update cache since we modified locations array
|
// must update cache since we modified locations array
|
||||||
lb.updateCachedStorageInfo();
|
lb.updateCachedStorageInfo();
|
||||||
|
|
|
@ -294,6 +294,7 @@ public class ProvidedStorageMap {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
LocatedBlocks build(DatanodeDescriptor client) {
|
LocatedBlocks build(DatanodeDescriptor client) {
|
||||||
|
// TODO choose provided locations close to the client.
|
||||||
return new LocatedBlocks(
|
return new LocatedBlocks(
|
||||||
flen, isUC, blocks, last, lastComplete, feInfo, ecPolicy);
|
flen, isUC, blocks, last, lastComplete, feInfo, ecPolicy);
|
||||||
}
|
}
|
||||||
|
@ -333,7 +334,6 @@ public class ProvidedStorageMap {
|
||||||
DatanodeDescriptor dn, DatanodeStorage s) {
|
DatanodeDescriptor dn, DatanodeStorage s) {
|
||||||
dns.put(dn.getDatanodeUuid(), dn);
|
dns.put(dn.getDatanodeUuid(), dn);
|
||||||
dnR.add(dn);
|
dnR.add(dn);
|
||||||
// TODO: maintain separate RPC ident per dn
|
|
||||||
return storageMap.get(s.getStorageID());
|
return storageMap.get(s.getStorageID());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -522,7 +522,7 @@ public class ProvidedStorageMap {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int getNumberOfBlocks() {
|
public int getNumberOfBlocks() {
|
||||||
// VERIFY: only printed for debugging
|
// is ignored for ProvidedBlockList.
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -422,7 +422,7 @@ public abstract class Storage extends StorageInfo {
|
||||||
public void clearDirectory() throws IOException {
|
public void clearDirectory() throws IOException {
|
||||||
File curDir = this.getCurrentDir();
|
File curDir = this.getCurrentDir();
|
||||||
if (curDir == null) {
|
if (curDir == null) {
|
||||||
//if the directory is null, there is nothing to do.
|
// if the directory is null, there is nothing to do.
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (curDir.exists()) {
|
if (curDir.exists()) {
|
||||||
|
@ -638,7 +638,7 @@ public abstract class Storage extends StorageInfo {
|
||||||
|
|
||||||
if (location != null &&
|
if (location != null &&
|
||||||
location.getStorageType() == StorageType.PROVIDED) {
|
location.getStorageType() == StorageType.PROVIDED) {
|
||||||
//currently we assume that PROVIDED storages are always NORMAL
|
// currently we assume that PROVIDED storages are always NORMAL
|
||||||
return StorageState.NORMAL;
|
return StorageState.NORMAL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -764,7 +764,7 @@ public abstract class Storage extends StorageInfo {
|
||||||
public void doRecover(StorageState curState) throws IOException {
|
public void doRecover(StorageState curState) throws IOException {
|
||||||
File curDir = getCurrentDir();
|
File curDir = getCurrentDir();
|
||||||
if (curDir == null || root == null) {
|
if (curDir == null || root == null) {
|
||||||
//at this point, we do not support recovery on PROVIDED storages
|
// at this point, we do not support recovery on PROVIDED storages
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
String rootPath = root.getCanonicalPath();
|
String rootPath = root.getCanonicalPath();
|
||||||
|
|
|
@ -471,7 +471,7 @@ public class TextFileRegionAliasMap
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() throws IOException {
|
public void close() throws IOException {
|
||||||
//nothing to do;
|
// nothing to do;
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
|
|
|
@ -443,7 +443,7 @@ public class BlockPoolSliceStorage extends Storage {
|
||||||
LayoutVersion.Feature.FEDERATION, layoutVersion)) {
|
LayoutVersion.Feature.FEDERATION, layoutVersion)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
//no upgrades for storage directories that are PROVIDED
|
// no upgrades for storage directories that are PROVIDED
|
||||||
if (bpSd.getRoot() == null) {
|
if (bpSd.getRoot() == null) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -640,7 +640,6 @@ public class BlockPoolSliceStorage extends Storage {
|
||||||
* that holds the snapshot.
|
* that holds the snapshot.
|
||||||
*/
|
*/
|
||||||
void doFinalize(File dnCurDir) throws IOException {
|
void doFinalize(File dnCurDir) throws IOException {
|
||||||
LOG.info("doFinalize: " + dnCurDir);
|
|
||||||
if (dnCurDir == null) {
|
if (dnCurDir == null) {
|
||||||
return; //we do nothing if the directory is null
|
return; //we do nothing if the directory is null
|
||||||
}
|
}
|
||||||
|
|
|
@ -149,8 +149,8 @@ public class DataStorage extends Storage {
|
||||||
final String oldStorageID = sd.getStorageUuid();
|
final String oldStorageID = sd.getStorageUuid();
|
||||||
if (sd.getStorageLocation() != null &&
|
if (sd.getStorageLocation() != null &&
|
||||||
sd.getStorageLocation().getStorageType() == StorageType.PROVIDED) {
|
sd.getStorageLocation().getStorageType() == StorageType.PROVIDED) {
|
||||||
// We only support one provided storage per datanode for now.
|
// Only one provided storage id is supported.
|
||||||
// TODO support multiple provided storage ids per datanode.
|
// TODO support multiple provided storage ids
|
||||||
sd.setStorageUuid(conf.get(DFSConfigKeys.DFS_PROVIDER_STORAGEUUID,
|
sd.setStorageUuid(conf.get(DFSConfigKeys.DFS_PROVIDER_STORAGEUUID,
|
||||||
DFSConfigKeys.DFS_PROVIDER_STORAGEUUID_DEFAULT));
|
DFSConfigKeys.DFS_PROVIDER_STORAGEUUID_DEFAULT));
|
||||||
return false;
|
return false;
|
||||||
|
|
|
@ -310,7 +310,6 @@ public abstract class ProvidedReplica extends ReplicaInfo {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int compareWith(ScanInfo info) {
|
public int compareWith(ScanInfo info) {
|
||||||
//local scanning cannot find any provided blocks.
|
|
||||||
if (info.getFileRegion().equals(
|
if (info.getFileRegion().equals(
|
||||||
new FileRegion(this.getBlockId(), new Path(getRemoteURI()),
|
new FileRegion(this.getBlockId(), new Path(getRemoteURI()),
|
||||||
fileOffset, this.getNumBytes(), this.getGenerationStamp()))) {
|
fileOffset, this.getNumBytes(), this.getGenerationStamp()))) {
|
||||||
|
|
|
@ -108,10 +108,10 @@ public class StorageLocation
|
||||||
}
|
}
|
||||||
if (sd.getStorageLocation().getStorageType() == StorageType.PROVIDED ||
|
if (sd.getStorageLocation().getStorageType() == StorageType.PROVIDED ||
|
||||||
storageType == StorageType.PROVIDED) {
|
storageType == StorageType.PROVIDED) {
|
||||||
//only one of these is PROVIDED; so it cannot be a match!
|
// only one PROVIDED storage directory can exist; so this cannot match!
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
//both storage directories are local
|
// both storage directories are local
|
||||||
return this.getBpURI(bpid, Storage.STORAGE_DIR_CURRENT).normalize()
|
return this.getBpURI(bpid, Storage.STORAGE_DIR_CURRENT).normalize()
|
||||||
.equals(sd.getRoot().toURI().normalize());
|
.equals(sd.getRoot().toURI().normalize());
|
||||||
}
|
}
|
||||||
|
@ -212,7 +212,9 @@ public class StorageLocation
|
||||||
conf = new HdfsConfiguration();
|
conf = new HdfsConfiguration();
|
||||||
}
|
}
|
||||||
if (storageType == StorageType.PROVIDED) {
|
if (storageType == StorageType.PROVIDED) {
|
||||||
//skip creation if the storage type is PROVIDED
|
// skip creation if the storage type is PROVIDED
|
||||||
|
Storage.LOG.info("Skipping creating directory for block pool "
|
||||||
|
+ blockPoolID + " for PROVIDED storage location " + this);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -231,8 +233,8 @@ public class StorageLocation
|
||||||
|
|
||||||
@Override // Checkable
|
@Override // Checkable
|
||||||
public VolumeCheckResult check(CheckContext context) throws IOException {
|
public VolumeCheckResult check(CheckContext context) throws IOException {
|
||||||
//we assume provided storage locations are always healthy,
|
// assume provided storage locations are always healthy,
|
||||||
//and check only for local storages.
|
// and check only for local storages.
|
||||||
if (storageType != StorageType.PROVIDED) {
|
if (storageType != StorageType.PROVIDED) {
|
||||||
DiskChecker.checkDir(
|
DiskChecker.checkDir(
|
||||||
context.localFileSystem,
|
context.localFileSystem,
|
||||||
|
|
|
@ -1760,7 +1760,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
|
|
||||||
Set<String> missingVolumesReported = new HashSet<>();
|
Set<String> missingVolumesReported = new HashSet<>();
|
||||||
for (ReplicaInfo b : volumeMap.replicas(bpid)) {
|
for (ReplicaInfo b : volumeMap.replicas(bpid)) {
|
||||||
//skip blocks in PROVIDED storage
|
// skip PROVIDED replicas.
|
||||||
if (b.getVolume().getStorageType() == StorageType.PROVIDED) {
|
if (b.getVolume().getStorageType() == StorageType.PROVIDED) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -2281,7 +2281,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
|
|
||||||
if (vol.getStorageType() == StorageType.PROVIDED) {
|
if (vol.getStorageType() == StorageType.PROVIDED) {
|
||||||
if (memBlockInfo == null) {
|
if (memBlockInfo == null) {
|
||||||
//replica exists on provided store but not in memory
|
// replica exists on provided store but not in memory
|
||||||
ReplicaInfo diskBlockInfo =
|
ReplicaInfo diskBlockInfo =
|
||||||
new ReplicaBuilder(ReplicaState.FINALIZED)
|
new ReplicaBuilder(ReplicaState.FINALIZED)
|
||||||
.setFileRegion(scanInfo.getFileRegion())
|
.setFileRegion(scanInfo.getFileRegion())
|
||||||
|
@ -2292,7 +2292,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
volumeMap.add(bpid, diskBlockInfo);
|
volumeMap.add(bpid, diskBlockInfo);
|
||||||
LOG.warn("Added missing block to memory " + diskBlockInfo);
|
LOG.warn("Added missing block to memory " + diskBlockInfo);
|
||||||
} else {
|
} else {
|
||||||
//replica exists in memory but not in the provided store
|
// replica exists in memory but not in the provided store
|
||||||
volumeMap.remove(bpid, blockId);
|
volumeMap.remove(bpid, blockId);
|
||||||
LOG.warn("Deleting missing provided block " + memBlockInfo);
|
LOG.warn("Deleting missing provided block " + memBlockInfo);
|
||||||
}
|
}
|
||||||
|
|
|
@ -224,7 +224,7 @@ class ProvidedVolumeImpl extends FsVolumeImpl {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void shutdown(BlockListAsLongs blocksListsAsLongs) {
|
public void shutdown(BlockListAsLongs blocksListsAsLongs) {
|
||||||
//nothing to do!
|
// nothing to do!
|
||||||
}
|
}
|
||||||
|
|
||||||
public void compileReport(LinkedList<ScanInfo> report,
|
public void compileReport(LinkedList<ScanInfo> report,
|
||||||
|
@ -264,7 +264,7 @@ class ProvidedVolumeImpl extends FsVolumeImpl {
|
||||||
new ConcurrentHashMap<String, ProvidedBlockPoolSlice>();
|
new ConcurrentHashMap<String, ProvidedBlockPoolSlice>();
|
||||||
|
|
||||||
private ProvidedVolumeDF df;
|
private ProvidedVolumeDF df;
|
||||||
//the remote FileSystem to which this ProvidedVolume points to.
|
// the remote FileSystem to which this ProvidedVolume points to.
|
||||||
private FileSystem remoteFS;
|
private FileSystem remoteFS;
|
||||||
|
|
||||||
ProvidedVolumeImpl(FsDatasetImpl dataset, String storageID,
|
ProvidedVolumeImpl(FsDatasetImpl dataset, String storageID,
|
||||||
|
@ -395,9 +395,9 @@ class ProvidedVolumeImpl extends FsVolumeImpl {
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
private boolean atEnd;
|
private boolean atEnd;
|
||||||
|
|
||||||
//The id of the last block read when the state of the iterator is saved.
|
// The id of the last block read when the state of the iterator is saved.
|
||||||
//This implementation assumes that provided blocks are returned
|
// This implementation assumes that provided blocks are returned
|
||||||
//in sorted order of the block ids.
|
// in sorted order of the block ids.
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
private long lastBlockId;
|
private long lastBlockId;
|
||||||
}
|
}
|
||||||
|
@ -421,7 +421,7 @@ class ProvidedVolumeImpl extends FsVolumeImpl {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() throws IOException {
|
public void close() throws IOException {
|
||||||
//No action needed
|
blockAliasMap.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -467,14 +467,14 @@ class ProvidedVolumeImpl extends FsVolumeImpl {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void save() throws IOException {
|
public void save() throws IOException {
|
||||||
//We do not persist the state of this iterator anywhere, locally.
|
// We do not persist the state of this iterator locally.
|
||||||
//We just re-scan provided volumes as necessary.
|
// We just re-scan provided volumes as necessary.
|
||||||
state.lastSavedMs = Time.now();
|
state.lastSavedMs = Time.now();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setMaxStalenessMs(long maxStalenessMs) {
|
public void setMaxStalenessMs(long maxStalenessMs) {
|
||||||
//do not use max staleness
|
// do not use max staleness
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -493,7 +493,7 @@ class ProvidedVolumeImpl extends FsVolumeImpl {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void load() throws IOException {
|
public void load() throws IOException {
|
||||||
//on load, we just rewind the iterator for provided volumes.
|
// on load, we just rewind the iterator for provided volumes.
|
||||||
rewind();
|
rewind();
|
||||||
LOG.trace("load({}, {}): loaded iterator {}: {}", getStorageID(),
|
LOG.trace("load({}, {}): loaded iterator {}: {}", getStorageID(),
|
||||||
bpid, name, WRITER.writeValueAsString(state));
|
bpid, name, WRITER.writeValueAsString(state));
|
||||||
|
@ -615,7 +615,6 @@ class ProvidedVolumeImpl extends FsVolumeImpl {
|
||||||
LinkedList<ScanInfo> report, ReportCompiler reportCompiler)
|
LinkedList<ScanInfo> report, ReportCompiler reportCompiler)
|
||||||
throws InterruptedException, IOException {
|
throws InterruptedException, IOException {
|
||||||
LOG.info("Compiling report for volume: " + this + " bpid " + bpid);
|
LOG.info("Compiling report for volume: " + this + " bpid " + bpid);
|
||||||
//get the report from the appropriate block pool.
|
|
||||||
if(bpSlices.containsKey(bpid)) {
|
if(bpSlices.containsKey(bpid)) {
|
||||||
bpSlices.get(bpid).compileReport(report, reportCompiler);
|
bpSlices.get(bpid).compileReport(report, reportCompiler);
|
||||||
}
|
}
|
||||||
|
|
|
@ -144,9 +144,11 @@ public class TestBlockStoragePolicy {
|
||||||
expectedPolicyStrings.put(ALLSSD, "BlockStoragePolicy{ALL_SSD:" + ALLSSD +
|
expectedPolicyStrings.put(ALLSSD, "BlockStoragePolicy{ALL_SSD:" + ALLSSD +
|
||||||
", storageTypes=[SSD], creationFallbacks=[DISK], " +
|
", storageTypes=[SSD], creationFallbacks=[DISK], " +
|
||||||
"replicationFallbacks=[DISK]}");
|
"replicationFallbacks=[DISK]}");
|
||||||
expectedPolicyStrings.put(PROVIDED, "BlockStoragePolicy{PROVIDED:" + PROVIDED +
|
expectedPolicyStrings.put(PROVIDED,
|
||||||
", storageTypes=[PROVIDED, DISK], creationFallbacks=[PROVIDED, DISK], " +
|
"BlockStoragePolicy{PROVIDED:" + PROVIDED
|
||||||
"replicationFallbacks=[PROVIDED, DISK]}");
|
+ ", storageTypes=[PROVIDED, DISK], "
|
||||||
|
+ "creationFallbacks=[PROVIDED, DISK], "
|
||||||
|
+ "replicationFallbacks=[PROVIDED, DISK]}");
|
||||||
|
|
||||||
for(byte i = 1; i < 16; i++) {
|
for(byte i = 1; i < 16; i++) {
|
||||||
final BlockStoragePolicy policy = POLICY_SUITE.getPolicy(i);
|
final BlockStoragePolicy policy = POLICY_SUITE.getPolicy(i);
|
||||||
|
|
|
@ -325,11 +325,12 @@ public class TestDatanodeManager {
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testBadScript() throws IOException, URISyntaxException {
|
public void testBadScript() throws IOException, URISyntaxException {
|
||||||
HelperFunction("/"+ Shell.appendScriptExtension("topology-broken-script"), 0);
|
HelperFunction("/" + Shell.appendScriptExtension("topology-broken-script"),
|
||||||
|
0);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test with different sorting functions but include datanodes
|
* Test with different sorting functions but include datanodes.
|
||||||
* with provided storage
|
* with provided storage
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
* @throws URISyntaxException
|
* @throws URISyntaxException
|
||||||
|
|
|
@ -73,13 +73,13 @@ public class TestProvidedStorageMap {
|
||||||
nameSystemLock, bm, conf);
|
nameSystemLock, bm, conf);
|
||||||
DatanodeStorageInfo providedMapStorage =
|
DatanodeStorageInfo providedMapStorage =
|
||||||
providedMap.getProvidedStorageInfo();
|
providedMap.getProvidedStorageInfo();
|
||||||
//the provided storage cannot be null
|
// the provided storage cannot be null
|
||||||
assertNotNull(providedMapStorage);
|
assertNotNull(providedMapStorage);
|
||||||
|
|
||||||
//create a datanode
|
// create a datanode
|
||||||
DatanodeDescriptor dn1 = createDatanodeDescriptor(5000);
|
DatanodeDescriptor dn1 = createDatanodeDescriptor(5000);
|
||||||
|
|
||||||
//associate two storages to the datanode
|
// associate two storages to the datanode
|
||||||
DatanodeStorage dn1ProvidedStorage = new DatanodeStorage(
|
DatanodeStorage dn1ProvidedStorage = new DatanodeStorage(
|
||||||
providedStorageID,
|
providedStorageID,
|
||||||
DatanodeStorage.State.NORMAL,
|
DatanodeStorage.State.NORMAL,
|
||||||
|
@ -96,15 +96,15 @@ public class TestProvidedStorageMap {
|
||||||
dns1Provided == providedMapStorage);
|
dns1Provided == providedMapStorage);
|
||||||
assertTrue("Disk storage has not yet been registered with block manager",
|
assertTrue("Disk storage has not yet been registered with block manager",
|
||||||
dns1Disk == null);
|
dns1Disk == null);
|
||||||
//add the disk storage to the datanode.
|
// add the disk storage to the datanode.
|
||||||
DatanodeStorageInfo dnsDisk = new DatanodeStorageInfo(dn1, dn1DiskStorage);
|
DatanodeStorageInfo dnsDisk = new DatanodeStorageInfo(dn1, dn1DiskStorage);
|
||||||
dn1.injectStorage(dnsDisk);
|
dn1.injectStorage(dnsDisk);
|
||||||
assertTrue("Disk storage must match the injected storage info",
|
assertTrue("Disk storage must match the injected storage info",
|
||||||
dnsDisk == providedMap.getStorage(dn1, dn1DiskStorage));
|
dnsDisk == providedMap.getStorage(dn1, dn1DiskStorage));
|
||||||
|
|
||||||
//create a 2nd datanode
|
// create a 2nd datanode
|
||||||
DatanodeDescriptor dn2 = createDatanodeDescriptor(5010);
|
DatanodeDescriptor dn2 = createDatanodeDescriptor(5010);
|
||||||
//associate a provided storage with the datanode
|
// associate a provided storage with the datanode
|
||||||
DatanodeStorage dn2ProvidedStorage = new DatanodeStorage(
|
DatanodeStorage dn2ProvidedStorage = new DatanodeStorage(
|
||||||
providedStorageID,
|
providedStorageID,
|
||||||
DatanodeStorage.State.NORMAL,
|
DatanodeStorage.State.NORMAL,
|
||||||
|
|
|
@ -50,9 +50,9 @@ public class TestProvidedReplicaImpl {
|
||||||
private static final String BASE_DIR =
|
private static final String BASE_DIR =
|
||||||
new FileSystemTestHelper().getTestRootDir();
|
new FileSystemTestHelper().getTestRootDir();
|
||||||
private static final String FILE_NAME = "provided-test";
|
private static final String FILE_NAME = "provided-test";
|
||||||
//length of the file that is associated with the provided blocks.
|
// length of the file that is associated with the provided blocks.
|
||||||
private static final long FILE_LEN = 128 * 1024 * 10L + 64 * 1024;
|
private static final long FILE_LEN = 128 * 1024 * 10L + 64 * 1024;
|
||||||
//length of each provided block.
|
// length of each provided block.
|
||||||
private static final long BLK_LEN = 128 * 1024L;
|
private static final long BLK_LEN = 128 * 1024L;
|
||||||
|
|
||||||
private static List<ProvidedReplica> replicas;
|
private static List<ProvidedReplica> replicas;
|
||||||
|
@ -63,7 +63,6 @@ public class TestProvidedReplicaImpl {
|
||||||
if(!newFile.exists()) {
|
if(!newFile.exists()) {
|
||||||
newFile.createNewFile();
|
newFile.createNewFile();
|
||||||
OutputStream writer = new FileOutputStream(newFile.getAbsolutePath());
|
OutputStream writer = new FileOutputStream(newFile.getAbsolutePath());
|
||||||
//FILE_LEN is length in bytes.
|
|
||||||
byte[] bytes = new byte[1];
|
byte[] bytes = new byte[1];
|
||||||
bytes[0] = (byte) 0;
|
bytes[0] = (byte) 0;
|
||||||
for(int i=0; i< FILE_LEN; i++) {
|
for(int i=0; i< FILE_LEN; i++) {
|
||||||
|
@ -106,7 +105,7 @@ public class TestProvidedReplicaImpl {
|
||||||
* @param dataLength length
|
* @param dataLength length
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
private void verifyReplicaContents(File file,
|
public static void verifyReplicaContents(File file,
|
||||||
InputStream ins, long fileOffset, long dataLength)
|
InputStream ins, long fileOffset, long dataLength)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
|
||||||
|
@ -142,9 +141,9 @@ public class TestProvidedReplicaImpl {
|
||||||
public void testProvidedReplicaRead() throws IOException {
|
public void testProvidedReplicaRead() throws IOException {
|
||||||
|
|
||||||
File providedFile = new File(BASE_DIR, FILE_NAME);
|
File providedFile = new File(BASE_DIR, FILE_NAME);
|
||||||
for(int i=0; i < replicas.size(); i++) {
|
for (int i = 0; i < replicas.size(); i++) {
|
||||||
ProvidedReplica replica = replicas.get(i);
|
ProvidedReplica replica = replicas.get(i);
|
||||||
//block data should exist!
|
// block data should exist!
|
||||||
assertTrue(replica.blockDataExists());
|
assertTrue(replica.blockDataExists());
|
||||||
assertEquals(providedFile.toURI(), replica.getBlockURI());
|
assertEquals(providedFile.toURI(), replica.getBlockURI());
|
||||||
verifyReplicaContents(providedFile, replica.getDataInputStream(0),
|
verifyReplicaContents(providedFile, replica.getDataInputStream(0),
|
||||||
|
@ -153,7 +152,7 @@ public class TestProvidedReplicaImpl {
|
||||||
LOG.info("All replica contents verified");
|
LOG.info("All replica contents verified");
|
||||||
|
|
||||||
providedFile.delete();
|
providedFile.delete();
|
||||||
//the block data should no longer be found!
|
// the block data should no longer be found!
|
||||||
for(int i=0; i < replicas.size(); i++) {
|
for(int i=0; i < replicas.size(); i++) {
|
||||||
ProvidedReplica replica = replicas.get(i);
|
ProvidedReplica replica = replicas.get(i);
|
||||||
assertTrue(!replica.blockDataExists());
|
assertTrue(!replica.blockDataExists());
|
||||||
|
|
|
@ -26,8 +26,6 @@ import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileInputStream;
|
|
||||||
import java.io.FileNotFoundException;
|
|
||||||
import java.io.FileOutputStream;
|
import java.io.FileOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
|
@ -35,9 +33,6 @@ import java.io.OutputStreamWriter;
|
||||||
import java.io.Writer;
|
import java.io.Writer;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
import java.nio.ByteBuffer;
|
|
||||||
import java.nio.channels.Channels;
|
|
||||||
import java.nio.channels.ReadableByteChannel;
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
|
@ -76,6 +71,7 @@ import org.apache.hadoop.hdfs.server.datanode.ProvidedReplica;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
|
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry;
|
import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
|
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.TestProvidedReplicaImpl;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.BlockIterator;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.BlockIterator;
|
||||||
|
@ -97,7 +93,7 @@ public class TestProvidedImpl {
|
||||||
private static final String BASE_DIR =
|
private static final String BASE_DIR =
|
||||||
new FileSystemTestHelper().getTestRootDir();
|
new FileSystemTestHelper().getTestRootDir();
|
||||||
private static final int NUM_LOCAL_INIT_VOLUMES = 1;
|
private static final int NUM_LOCAL_INIT_VOLUMES = 1;
|
||||||
//only support one provided volume for now.
|
// only support one provided volume for now.
|
||||||
private static final int NUM_PROVIDED_INIT_VOLUMES = 1;
|
private static final int NUM_PROVIDED_INIT_VOLUMES = 1;
|
||||||
private static final String[] BLOCK_POOL_IDS = {"bpid-0", "bpid-1"};
|
private static final String[] BLOCK_POOL_IDS = {"bpid-0", "bpid-1"};
|
||||||
private static final int NUM_PROVIDED_BLKS = 10;
|
private static final int NUM_PROVIDED_BLKS = 10;
|
||||||
|
@ -168,7 +164,7 @@ public class TestProvidedImpl {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void remove() {
|
public void remove() {
|
||||||
//do nothing.
|
// do nothing.
|
||||||
}
|
}
|
||||||
|
|
||||||
public void resetMinBlockId(int minId) {
|
public void resetMinBlockId(int minId) {
|
||||||
|
@ -314,33 +310,6 @@ public class TestProvidedImpl {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void compareBlkFile(InputStream ins, String filepath)
|
|
||||||
throws FileNotFoundException, IOException {
|
|
||||||
try (ReadableByteChannel i = Channels.newChannel(
|
|
||||||
new FileInputStream(new File(filepath)))) {
|
|
||||||
try (ReadableByteChannel j = Channels.newChannel(ins)) {
|
|
||||||
ByteBuffer ib = ByteBuffer.allocate(4096);
|
|
||||||
ByteBuffer jb = ByteBuffer.allocate(4096);
|
|
||||||
while (true) {
|
|
||||||
int il = i.read(ib);
|
|
||||||
int jl = j.read(jb);
|
|
||||||
if (il < 0 || jl < 0) {
|
|
||||||
assertEquals(il, jl);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
ib.flip();
|
|
||||||
jb.flip();
|
|
||||||
int cmp = Math.min(ib.remaining(), jb.remaining());
|
|
||||||
for (int k = 0; k < cmp; ++k) {
|
|
||||||
assertEquals(ib.get(), jb.get());
|
|
||||||
}
|
|
||||||
ib.compact();
|
|
||||||
jb.compact();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws IOException {
|
public void setUp() throws IOException {
|
||||||
datanode = mock(DataNode.class);
|
datanode = mock(DataNode.class);
|
||||||
|
@ -392,7 +361,7 @@ public class TestProvidedImpl {
|
||||||
assertEquals(0, dataset.getNumFailedVolumes());
|
assertEquals(0, dataset.getNumFailedVolumes());
|
||||||
|
|
||||||
for (int i = 0; i < providedVolumes.size(); i++) {
|
for (int i = 0; i < providedVolumes.size(); i++) {
|
||||||
//check basic information about provided volume
|
// check basic information about provided volume
|
||||||
assertEquals(DFSConfigKeys.DFS_PROVIDER_STORAGEUUID_DEFAULT,
|
assertEquals(DFSConfigKeys.DFS_PROVIDER_STORAGEUUID_DEFAULT,
|
||||||
providedVolumes.get(i).getStorageID());
|
providedVolumes.get(i).getStorageID());
|
||||||
assertEquals(StorageType.PROVIDED,
|
assertEquals(StorageType.PROVIDED,
|
||||||
|
@ -400,7 +369,7 @@ public class TestProvidedImpl {
|
||||||
|
|
||||||
long space = providedVolumes.get(i).getBlockPoolUsed(
|
long space = providedVolumes.get(i).getBlockPoolUsed(
|
||||||
BLOCK_POOL_IDS[CHOSEN_BP_ID]);
|
BLOCK_POOL_IDS[CHOSEN_BP_ID]);
|
||||||
//check the df stats of the volume
|
// check the df stats of the volume
|
||||||
assertEquals(spaceUsed, space);
|
assertEquals(spaceUsed, space);
|
||||||
assertEquals(NUM_PROVIDED_BLKS, providedVolumes.get(i).getNumBlocks());
|
assertEquals(NUM_PROVIDED_BLKS, providedVolumes.get(i).getNumBlocks());
|
||||||
|
|
||||||
|
@ -409,7 +378,7 @@ public class TestProvidedImpl {
|
||||||
try {
|
try {
|
||||||
assertEquals(0, providedVolumes.get(i)
|
assertEquals(0, providedVolumes.get(i)
|
||||||
.getBlockPoolUsed(BLOCK_POOL_IDS[1 - CHOSEN_BP_ID]));
|
.getBlockPoolUsed(BLOCK_POOL_IDS[1 - CHOSEN_BP_ID]));
|
||||||
//should not be triggered
|
// should not be triggered
|
||||||
assertTrue(false);
|
assertTrue(false);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.info("Expected exception: " + e);
|
LOG.info("Expected exception: " + e);
|
||||||
|
@ -428,7 +397,7 @@ public class TestProvidedImpl {
|
||||||
assertEquals(vol.getBlockPoolList().length, BLOCK_POOL_IDS.length);
|
assertEquals(vol.getBlockPoolList().length, BLOCK_POOL_IDS.length);
|
||||||
for (int j = 0; j < BLOCK_POOL_IDS.length; j++) {
|
for (int j = 0; j < BLOCK_POOL_IDS.length; j++) {
|
||||||
if (j != CHOSEN_BP_ID) {
|
if (j != CHOSEN_BP_ID) {
|
||||||
//this block pool should not have any blocks
|
// this block pool should not have any blocks
|
||||||
assertEquals(null, volumeMap.replicas(BLOCK_POOL_IDS[j]));
|
assertEquals(null, volumeMap.replicas(BLOCK_POOL_IDS[j]));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -445,7 +414,8 @@ public class TestProvidedImpl {
|
||||||
HdfsConstants.GRANDFATHER_GENERATION_STAMP);
|
HdfsConstants.GRANDFATHER_GENERATION_STAMP);
|
||||||
InputStream ins = dataset.getBlockInputStream(eb, 0);
|
InputStream ins = dataset.getBlockInputStream(eb, 0);
|
||||||
String filepath = blkToPathMap.get((long) id);
|
String filepath = blkToPathMap.get((long) id);
|
||||||
compareBlkFile(ins, filepath);
|
TestProvidedReplicaImpl.verifyReplicaContents(new File(filepath), ins, 0,
|
||||||
|
BLK_LEN);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -462,7 +432,7 @@ public class TestProvidedImpl {
|
||||||
ExtendedBlock eb = iter.nextBlock();
|
ExtendedBlock eb = iter.nextBlock();
|
||||||
long blkId = eb.getBlockId();
|
long blkId = eb.getBlockId();
|
||||||
assertTrue(blkId >= MIN_BLK_ID && blkId < NUM_PROVIDED_BLKS);
|
assertTrue(blkId >= MIN_BLK_ID && blkId < NUM_PROVIDED_BLKS);
|
||||||
//all block ids must be unique!
|
// all block ids must be unique!
|
||||||
assertTrue(!blockIdsUsed.contains(blkId));
|
assertTrue(!blockIdsUsed.contains(blkId));
|
||||||
blockIdsUsed.add(blkId);
|
blockIdsUsed.add(blkId);
|
||||||
}
|
}
|
||||||
|
@ -473,14 +443,14 @@ public class TestProvidedImpl {
|
||||||
while(!iter.atEnd()) {
|
while(!iter.atEnd()) {
|
||||||
ExtendedBlock eb = iter.nextBlock();
|
ExtendedBlock eb = iter.nextBlock();
|
||||||
long blkId = eb.getBlockId();
|
long blkId = eb.getBlockId();
|
||||||
//the block should have already appeared in the first scan.
|
// the block should have already appeared in the first scan.
|
||||||
assertTrue(blockIdsUsed.contains(blkId));
|
assertTrue(blockIdsUsed.contains(blkId));
|
||||||
blockIdsUsed.remove(blkId);
|
blockIdsUsed.remove(blkId);
|
||||||
}
|
}
|
||||||
//none of the blocks should remain in blockIdsUsed
|
// none of the blocks should remain in blockIdsUsed
|
||||||
assertEquals(0, blockIdsUsed.size());
|
assertEquals(0, blockIdsUsed.size());
|
||||||
|
|
||||||
//the other block pool should not contain any blocks!
|
// the other block pool should not contain any blocks!
|
||||||
BlockIterator nonProvidedBpIter =
|
BlockIterator nonProvidedBpIter =
|
||||||
vol.newBlockIterator(BLOCK_POOL_IDS[1 - CHOSEN_BP_ID], "temp");
|
vol.newBlockIterator(BLOCK_POOL_IDS[1 - CHOSEN_BP_ID], "temp");
|
||||||
assertEquals(null, nonProvidedBpIter.nextBlock());
|
assertEquals(null, nonProvidedBpIter.nextBlock());
|
||||||
|
@ -513,8 +483,8 @@ public class TestProvidedImpl {
|
||||||
public void testProvidedVolumeContents() throws IOException {
|
public void testProvidedVolumeContents() throws IOException {
|
||||||
int expectedBlocks = 5;
|
int expectedBlocks = 5;
|
||||||
int minId = 0;
|
int minId = 0;
|
||||||
//use a path which has the same prefix as providedBasePath
|
// use a path which has the same prefix as providedBasePath
|
||||||
//all these blocks can belong to the provided volume
|
// all these blocks can belong to the provided volume
|
||||||
int blocksFound = getBlocksInProvidedVolumes(providedBasePath + "/test1/",
|
int blocksFound = getBlocksInProvidedVolumes(providedBasePath + "/test1/",
|
||||||
expectedBlocks, minId);
|
expectedBlocks, minId);
|
||||||
assertEquals(
|
assertEquals(
|
||||||
|
@ -525,8 +495,8 @@ public class TestProvidedImpl {
|
||||||
assertEquals(
|
assertEquals(
|
||||||
"Number of blocks in provided volumes should be " + expectedBlocks,
|
"Number of blocks in provided volumes should be " + expectedBlocks,
|
||||||
expectedBlocks, blocksFound);
|
expectedBlocks, blocksFound);
|
||||||
//use a path that is entirely different from the providedBasePath
|
// use a path that is entirely different from the providedBasePath
|
||||||
//none of these blocks can belong to the volume
|
// none of these blocks can belong to the volume
|
||||||
blocksFound =
|
blocksFound =
|
||||||
getBlocksInProvidedVolumes("randomtest1/", expectedBlocks, minId);
|
getBlocksInProvidedVolumes("randomtest1/", expectedBlocks, minId);
|
||||||
assertEquals("Number of blocks in provided volumes should be 0", 0,
|
assertEquals("Number of blocks in provided volumes should be 0", 0,
|
||||||
|
|
|
@ -23,8 +23,8 @@
|
||||||
<groupId>org.apache.hadoop</groupId>
|
<groupId>org.apache.hadoop</groupId>
|
||||||
<artifactId>hadoop-fs2img</artifactId>
|
<artifactId>hadoop-fs2img</artifactId>
|
||||||
<version>3.1.0-SNAPSHOT</version>
|
<version>3.1.0-SNAPSHOT</version>
|
||||||
<description>fs2img</description>
|
<description>Apache Hadoop Image Generation Tool</description>
|
||||||
<name>fs2img</name>
|
<name>Apache Hadoop Image Generation Tool</name>
|
||||||
<packaging>jar</packaging>
|
<packaging>jar</packaging>
|
||||||
|
|
||||||
<properties>
|
<properties>
|
||||||
|
|
|
@ -125,7 +125,8 @@ public class FileSystemImage implements Tool {
|
||||||
opts.blockPoolID(o.getValue());
|
opts.blockPoolID(o.getValue());
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
throw new UnsupportedOperationException("Internal error");
|
throw new UnsupportedOperationException(
|
||||||
|
"Unknown option: " + o.getOpt());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -236,7 +236,7 @@ public class ImageWriter implements Closeable {
|
||||||
if (null == e) {
|
if (null == e) {
|
||||||
return super.put(p, b);
|
return super.put(p, b);
|
||||||
}
|
}
|
||||||
//merge
|
// merge
|
||||||
e.addAllChildren(b.getChildrenList());
|
e.addAllChildren(b.getChildrenList());
|
||||||
// not strictly conforming
|
// not strictly conforming
|
||||||
return e;
|
return e;
|
||||||
|
@ -265,7 +265,6 @@ public class ImageWriter implements Closeable {
|
||||||
e.writeDelimitedTo(dirs);
|
e.writeDelimitedTo(dirs);
|
||||||
}
|
}
|
||||||
|
|
||||||
// from FSImageFormatProtobuf... why not just read position from the stream?
|
|
||||||
private static int getOndiskSize(com.google.protobuf.GeneratedMessage s) {
|
private static int getOndiskSize(com.google.protobuf.GeneratedMessage s) {
|
||||||
return CodedOutputStream.computeRawVarint32Size(s.getSerializedSize())
|
return CodedOutputStream.computeRawVarint32Size(s.getSerializedSize())
|
||||||
+ s.getSerializedSize();
|
+ s.getSerializedSize();
|
||||||
|
@ -283,7 +282,7 @@ public class ImageWriter implements Closeable {
|
||||||
dircache.clear();
|
dircache.clear();
|
||||||
|
|
||||||
// close side files
|
// close side files
|
||||||
IOUtils.cleanup(null, dirs, inodes, blocks);
|
IOUtils.cleanupWithLogger(null, dirs, inodes, blocks);
|
||||||
if (null == dirs || null == inodes) {
|
if (null == dirs || null == inodes) {
|
||||||
// init failed
|
// init failed
|
||||||
if (raw != null) {
|
if (raw != null) {
|
||||||
|
@ -317,7 +316,6 @@ public class ImageWriter implements Closeable {
|
||||||
*/
|
*/
|
||||||
void writeMD5(String imagename) throws IOException {
|
void writeMD5(String imagename) throws IOException {
|
||||||
if (null == outdir) {
|
if (null == outdir) {
|
||||||
//LOG.warn("Not writing MD5");
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
MD5Hash md5 = new MD5Hash(digest.digest());
|
MD5Hash md5 = new MD5Hash(digest.digest());
|
||||||
|
@ -382,7 +380,6 @@ public class ImageWriter implements Closeable {
|
||||||
|
|
||||||
void writeDirSection() throws IOException {
|
void writeDirSection() throws IOException {
|
||||||
// No header, so dirs can be written/compressed independently
|
// No header, so dirs can be written/compressed independently
|
||||||
//INodeDirectorySection.Builder b = INodeDirectorySection.newBuilder();
|
|
||||||
OutputStream sec = raw;
|
OutputStream sec = raw;
|
||||||
// copy dirs
|
// copy dirs
|
||||||
try (FileInputStream in = new FileInputStream(dirsTmp)) {
|
try (FileInputStream in = new FileInputStream(dirsTmp)) {
|
||||||
|
|
|
@ -84,11 +84,11 @@ public class SingleUGIResolver extends UGIResolver implements Configurable {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void addUser(String name) {
|
public void addUser(String name) {
|
||||||
//do nothing
|
// do nothing
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void addGroup(String name) {
|
public void addGroup(String name) {
|
||||||
//do nothing
|
// do nothing
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -121,7 +121,6 @@ public class TreePath {
|
||||||
INode toFile(UGIResolver ugi, BlockResolver blk,
|
INode toFile(UGIResolver ugi, BlockResolver blk,
|
||||||
BlockAliasMap.Writer<FileRegion> out) throws IOException {
|
BlockAliasMap.Writer<FileRegion> out) throws IOException {
|
||||||
final FileStatus s = getFileStatus();
|
final FileStatus s = getFileStatus();
|
||||||
// TODO should this store resolver's user/group?
|
|
||||||
ugi.addUser(s.getOwner());
|
ugi.addUser(s.getOwner());
|
||||||
ugi.addGroup(s.getGroup());
|
ugi.addGroup(s.getGroup());
|
||||||
INodeFile.Builder b = INodeFile.newBuilder()
|
INodeFile.Builder b = INodeFile.newBuilder()
|
||||||
|
@ -142,7 +141,7 @@ public class TreePath {
|
||||||
"Exact path handle not supported by filesystem " + fs.toString());
|
"Exact path handle not supported by filesystem " + fs.toString());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
//TODO: storage policy should be configurable per path; use BlockResolver
|
// TODO: storage policy should be configurable per path; use BlockResolver
|
||||||
long off = 0L;
|
long off = 0L;
|
||||||
for (BlockProto block : blk.resolve(s)) {
|
for (BlockProto block : blk.resolve(s)) {
|
||||||
b.addBlocks(block);
|
b.addBlocks(block);
|
||||||
|
|
|
@ -84,19 +84,22 @@ import static org.apache.hadoop.hdfs.server.common.blockaliasmap.impl.TextFileRe
|
||||||
import static org.apache.hadoop.net.NodeBase.PATH_SEPARATOR_STR;
|
import static org.apache.hadoop.net.NodeBase.PATH_SEPARATOR_STR;
|
||||||
import static org.junit.Assert.*;
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
public class TestNameNodeProvidedImplementation {
|
/**
|
||||||
|
* Integration tests for the Provided implementation.
|
||||||
|
*/
|
||||||
|
public class ITestProvidedImplementation {
|
||||||
|
|
||||||
@Rule public TestName name = new TestName();
|
@Rule public TestName name = new TestName();
|
||||||
public static final Logger LOG =
|
public static final Logger LOG =
|
||||||
LoggerFactory.getLogger(TestNameNodeProvidedImplementation.class);
|
LoggerFactory.getLogger(ITestProvidedImplementation.class);
|
||||||
|
|
||||||
final Random r = new Random();
|
private final Random r = new Random();
|
||||||
final File fBASE = new File(MiniDFSCluster.getBaseDirectory());
|
private final File fBASE = new File(MiniDFSCluster.getBaseDirectory());
|
||||||
final Path BASE = new Path(fBASE.toURI().toString());
|
private final Path pBASE = new Path(fBASE.toURI().toString());
|
||||||
final Path NAMEPATH = new Path(BASE, "providedDir");
|
private final Path providedPath = new Path(pBASE, "providedDir");
|
||||||
final Path NNDIRPATH = new Path(BASE, "nnDir");
|
private final Path nnDirPath = new Path(pBASE, "nnDir");
|
||||||
final String SINGLEUSER = "usr1";
|
private final String singleUser = "usr1";
|
||||||
final String SINGLEGROUP = "grp1";
|
private final String singleGroup = "grp1";
|
||||||
private final int numFiles = 10;
|
private final int numFiles = 10;
|
||||||
private final String filePrefix = "file";
|
private final String filePrefix = "file";
|
||||||
private final String fileSuffix = ".dat";
|
private final String fileSuffix = ".dat";
|
||||||
|
@ -104,8 +107,8 @@ public class TestNameNodeProvidedImplementation {
|
||||||
private long providedDataSize = 0;
|
private long providedDataSize = 0;
|
||||||
private final String bpid = "BP-1234-10.1.1.1-1224";
|
private final String bpid = "BP-1234-10.1.1.1-1224";
|
||||||
|
|
||||||
Configuration conf;
|
private Configuration conf;
|
||||||
MiniDFSCluster cluster;
|
private MiniDFSCluster cluster;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setSeed() throws Exception {
|
public void setSeed() throws Exception {
|
||||||
|
@ -116,8 +119,8 @@ public class TestNameNodeProvidedImplementation {
|
||||||
r.setSeed(seed);
|
r.setSeed(seed);
|
||||||
System.out.println(name.getMethodName() + " seed: " + seed);
|
System.out.println(name.getMethodName() + " seed: " + seed);
|
||||||
conf = new HdfsConfiguration();
|
conf = new HdfsConfiguration();
|
||||||
conf.set(SingleUGIResolver.USER, SINGLEUSER);
|
conf.set(SingleUGIResolver.USER, singleUser);
|
||||||
conf.set(SingleUGIResolver.GROUP, SINGLEGROUP);
|
conf.set(SingleUGIResolver.GROUP, singleGroup);
|
||||||
|
|
||||||
conf.set(DFSConfigKeys.DFS_PROVIDER_STORAGEUUID,
|
conf.set(DFSConfigKeys.DFS_PROVIDER_STORAGEUUID,
|
||||||
DFSConfigKeys.DFS_PROVIDER_STORAGEUUID_DEFAULT);
|
DFSConfigKeys.DFS_PROVIDER_STORAGEUUID_DEFAULT);
|
||||||
|
@ -126,28 +129,28 @@ public class TestNameNodeProvidedImplementation {
|
||||||
conf.setClass(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_CLASS,
|
conf.setClass(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_CLASS,
|
||||||
TextFileRegionAliasMap.class, BlockAliasMap.class);
|
TextFileRegionAliasMap.class, BlockAliasMap.class);
|
||||||
conf.set(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_WRITE_DIR,
|
conf.set(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_WRITE_DIR,
|
||||||
NNDIRPATH.toString());
|
nnDirPath.toString());
|
||||||
conf.set(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_READ_FILE,
|
conf.set(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_READ_FILE,
|
||||||
new Path(NNDIRPATH, fileNameFromBlockPoolID(bpid)).toString());
|
new Path(nnDirPath, fileNameFromBlockPoolID(bpid)).toString());
|
||||||
conf.set(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_DELIMITER, ",");
|
conf.set(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_TEXT_DELIMITER, ",");
|
||||||
|
|
||||||
conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR_PROVIDED,
|
conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR_PROVIDED,
|
||||||
new File(NAMEPATH.toUri()).toString());
|
new File(providedPath.toUri()).toString());
|
||||||
File imageDir = new File(NAMEPATH.toUri());
|
File imageDir = new File(providedPath.toUri());
|
||||||
if (!imageDir.exists()) {
|
if (!imageDir.exists()) {
|
||||||
LOG.info("Creating directory: " + imageDir);
|
LOG.info("Creating directory: " + imageDir);
|
||||||
imageDir.mkdirs();
|
imageDir.mkdirs();
|
||||||
}
|
}
|
||||||
|
|
||||||
File nnDir = new File(NNDIRPATH.toUri());
|
File nnDir = new File(nnDirPath.toUri());
|
||||||
if (!nnDir.exists()) {
|
if (!nnDir.exists()) {
|
||||||
nnDir.mkdirs();
|
nnDir.mkdirs();
|
||||||
}
|
}
|
||||||
|
|
||||||
// create 10 random files under BASE
|
// create 10 random files under pBASE
|
||||||
for (int i=0; i < numFiles; i++) {
|
for (int i=0; i < numFiles; i++) {
|
||||||
File newFile = new File(
|
File newFile = new File(
|
||||||
new Path(NAMEPATH, filePrefix + i + fileSuffix).toUri());
|
new Path(providedPath, filePrefix + i + fileSuffix).toUri());
|
||||||
if(!newFile.exists()) {
|
if(!newFile.exists()) {
|
||||||
try {
|
try {
|
||||||
LOG.info("Creating " + newFile.toString());
|
LOG.info("Creating " + newFile.toString());
|
||||||
|
@ -244,9 +247,9 @@ public class TestNameNodeProvidedImplementation {
|
||||||
@Test(timeout=20000)
|
@Test(timeout=20000)
|
||||||
public void testLoadImage() throws Exception {
|
public void testLoadImage() throws Exception {
|
||||||
final long seed = r.nextLong();
|
final long seed = r.nextLong();
|
||||||
LOG.info("NAMEPATH: " + NAMEPATH);
|
LOG.info("providedPath: " + providedPath);
|
||||||
createImage(new RandomTreeWalk(seed), NNDIRPATH, FixedBlockResolver.class);
|
createImage(new RandomTreeWalk(seed), nnDirPath, FixedBlockResolver.class);
|
||||||
startCluster(NNDIRPATH, 0,
|
startCluster(nnDirPath, 0,
|
||||||
new StorageType[] {StorageType.PROVIDED, StorageType.DISK}, null,
|
new StorageType[] {StorageType.PROVIDED, StorageType.DISK}, null,
|
||||||
false);
|
false);
|
||||||
|
|
||||||
|
@ -260,8 +263,8 @@ public class TestNameNodeProvidedImplementation {
|
||||||
hs.getPath().toUri().getPath());
|
hs.getPath().toUri().getPath());
|
||||||
assertEquals(rs.getPermission(), hs.getPermission());
|
assertEquals(rs.getPermission(), hs.getPermission());
|
||||||
assertEquals(rs.getLen(), hs.getLen());
|
assertEquals(rs.getLen(), hs.getLen());
|
||||||
assertEquals(SINGLEUSER, hs.getOwner());
|
assertEquals(singleUser, hs.getOwner());
|
||||||
assertEquals(SINGLEGROUP, hs.getGroup());
|
assertEquals(singleGroup, hs.getGroup());
|
||||||
assertEquals(rs.getAccessTime(), hs.getAccessTime());
|
assertEquals(rs.getAccessTime(), hs.getAccessTime());
|
||||||
assertEquals(rs.getModificationTime(), hs.getModificationTime());
|
assertEquals(rs.getModificationTime(), hs.getModificationTime());
|
||||||
}
|
}
|
||||||
|
@ -271,10 +274,10 @@ public class TestNameNodeProvidedImplementation {
|
||||||
public void testProvidedReporting() throws Exception {
|
public void testProvidedReporting() throws Exception {
|
||||||
conf.setClass(ImageWriter.Options.UGI_CLASS,
|
conf.setClass(ImageWriter.Options.UGI_CLASS,
|
||||||
SingleUGIResolver.class, UGIResolver.class);
|
SingleUGIResolver.class, UGIResolver.class);
|
||||||
createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH,
|
createImage(new FSTreeWalk(providedPath, conf), nnDirPath,
|
||||||
FixedBlockResolver.class);
|
FixedBlockResolver.class);
|
||||||
int numDatanodes = 10;
|
int numDatanodes = 10;
|
||||||
startCluster(NNDIRPATH, numDatanodes,
|
startCluster(nnDirPath, numDatanodes,
|
||||||
new StorageType[] {StorageType.PROVIDED, StorageType.DISK}, null,
|
new StorageType[] {StorageType.PROVIDED, StorageType.DISK}, null,
|
||||||
false);
|
false);
|
||||||
long diskCapacity = 1000;
|
long diskCapacity = 1000;
|
||||||
|
@ -350,10 +353,10 @@ public class TestNameNodeProvidedImplementation {
|
||||||
public void testDefaultReplication() throws Exception {
|
public void testDefaultReplication() throws Exception {
|
||||||
int targetReplication = 2;
|
int targetReplication = 2;
|
||||||
conf.setInt(FixedBlockMultiReplicaResolver.REPLICATION, targetReplication);
|
conf.setInt(FixedBlockMultiReplicaResolver.REPLICATION, targetReplication);
|
||||||
createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH,
|
createImage(new FSTreeWalk(providedPath, conf), nnDirPath,
|
||||||
FixedBlockMultiReplicaResolver.class);
|
FixedBlockMultiReplicaResolver.class);
|
||||||
// make the last Datanode with only DISK
|
// make the last Datanode with only DISK
|
||||||
startCluster(NNDIRPATH, 3, null,
|
startCluster(nnDirPath, 3, null,
|
||||||
new StorageType[][] {
|
new StorageType[][] {
|
||||||
{StorageType.PROVIDED, StorageType.DISK},
|
{StorageType.PROVIDED, StorageType.DISK},
|
||||||
{StorageType.PROVIDED, StorageType.DISK},
|
{StorageType.PROVIDED, StorageType.DISK},
|
||||||
|
@ -364,15 +367,10 @@ public class TestNameNodeProvidedImplementation {
|
||||||
|
|
||||||
FileSystem fs = cluster.getFileSystem();
|
FileSystem fs = cluster.getFileSystem();
|
||||||
int count = 0;
|
int count = 0;
|
||||||
for (TreePath e : new FSTreeWalk(NAMEPATH, conf)) {
|
for (TreePath e : new FSTreeWalk(providedPath, conf)) {
|
||||||
FileStatus rs = e.getFileStatus();
|
FileStatus rs = e.getFileStatus();
|
||||||
Path hp = removePrefix(NAMEPATH, rs.getPath());
|
Path hp = removePrefix(providedPath, rs.getPath());
|
||||||
LOG.info("hp " + hp.toUri().getPath());
|
LOG.info("path: " + hp.toUri().getPath());
|
||||||
//skip HDFS specific files, which may have been created later on.
|
|
||||||
if (hp.toString().contains("in_use.lock")
|
|
||||||
|| hp.toString().contains("current")) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
e.accept(count++);
|
e.accept(count++);
|
||||||
assertTrue(fs.exists(hp));
|
assertTrue(fs.exists(hp));
|
||||||
FileStatus hs = fs.getFileStatus(hp);
|
FileStatus hs = fs.getFileStatus(hp);
|
||||||
|
@ -383,7 +381,7 @@ public class TestNameNodeProvidedImplementation {
|
||||||
int i = 0;
|
int i = 0;
|
||||||
for(; i < bl.length; i++) {
|
for(; i < bl.length; i++) {
|
||||||
int currentRep = bl[i].getHosts().length;
|
int currentRep = bl[i].getHosts().length;
|
||||||
assertEquals(targetReplication , currentRep);
|
assertEquals(targetReplication, currentRep);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -411,15 +409,10 @@ public class TestNameNodeProvidedImplementation {
|
||||||
FileSystem fs = cluster.getFileSystem();
|
FileSystem fs = cluster.getFileSystem();
|
||||||
int count = 0;
|
int count = 0;
|
||||||
// read NN metadata, verify contents match
|
// read NN metadata, verify contents match
|
||||||
for (TreePath e : new FSTreeWalk(NAMEPATH, conf)) {
|
for (TreePath e : new FSTreeWalk(providedPath, conf)) {
|
||||||
FileStatus rs = e.getFileStatus();
|
FileStatus rs = e.getFileStatus();
|
||||||
Path hp = removePrefix(NAMEPATH, rs.getPath());
|
Path hp = removePrefix(providedPath, rs.getPath());
|
||||||
LOG.info("hp " + hp.toUri().getPath());
|
LOG.info("path: " + hp.toUri().getPath());
|
||||||
//skip HDFS specific files, which may have been created later on.
|
|
||||||
if(hp.toString().contains("in_use.lock")
|
|
||||||
|| hp.toString().contains("current")) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
e.accept(count++);
|
e.accept(count++);
|
||||||
assertTrue(fs.exists(hp));
|
assertTrue(fs.exists(hp));
|
||||||
FileStatus hs = fs.getFileStatus(hp);
|
FileStatus hs = fs.getFileStatus(hp);
|
||||||
|
@ -462,7 +455,7 @@ public class TestNameNodeProvidedImplementation {
|
||||||
private BlockLocation[] createFile(Path path, short replication,
|
private BlockLocation[] createFile(Path path, short replication,
|
||||||
long fileLen, long blockLen) throws IOException {
|
long fileLen, long blockLen) throws IOException {
|
||||||
FileSystem fs = cluster.getFileSystem();
|
FileSystem fs = cluster.getFileSystem();
|
||||||
//create a sample file that is not provided
|
// create a file that is not provided
|
||||||
DFSTestUtil.createFile(fs, path, false, (int) blockLen,
|
DFSTestUtil.createFile(fs, path, false, (int) blockLen,
|
||||||
fileLen, blockLen, replication, 0, true);
|
fileLen, blockLen, replication, 0, true);
|
||||||
return fs.getFileBlockLocations(path, 0, fileLen);
|
return fs.getFileBlockLocations(path, 0, fileLen);
|
||||||
|
@ -471,7 +464,7 @@ public class TestNameNodeProvidedImplementation {
|
||||||
@Test(timeout=30000)
|
@Test(timeout=30000)
|
||||||
public void testClusterWithEmptyImage() throws IOException {
|
public void testClusterWithEmptyImage() throws IOException {
|
||||||
// start a cluster with 2 datanodes without any provided storage
|
// start a cluster with 2 datanodes without any provided storage
|
||||||
startCluster(NNDIRPATH, 2, null,
|
startCluster(nnDirPath, 2, null,
|
||||||
new StorageType[][] {
|
new StorageType[][] {
|
||||||
{StorageType.DISK},
|
{StorageType.DISK},
|
||||||
{StorageType.DISK}},
|
{StorageType.DISK}},
|
||||||
|
@ -518,10 +511,10 @@ public class TestNameNodeProvidedImplementation {
|
||||||
*/
|
*/
|
||||||
@Test(timeout=50000)
|
@Test(timeout=50000)
|
||||||
public void testSetReplicationForProvidedFiles() throws Exception {
|
public void testSetReplicationForProvidedFiles() throws Exception {
|
||||||
createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH,
|
createImage(new FSTreeWalk(providedPath, conf), nnDirPath,
|
||||||
FixedBlockResolver.class);
|
FixedBlockResolver.class);
|
||||||
// 10 Datanodes with both DISK and PROVIDED storage
|
// 10 Datanodes with both DISK and PROVIDED storage
|
||||||
startCluster(NNDIRPATH, 10,
|
startCluster(nnDirPath, 10,
|
||||||
new StorageType[]{
|
new StorageType[]{
|
||||||
StorageType.PROVIDED, StorageType.DISK},
|
StorageType.PROVIDED, StorageType.DISK},
|
||||||
null,
|
null,
|
||||||
|
@ -559,9 +552,9 @@ public class TestNameNodeProvidedImplementation {
|
||||||
|
|
||||||
@Test(timeout=30000)
|
@Test(timeout=30000)
|
||||||
public void testProvidedDatanodeFailures() throws Exception {
|
public void testProvidedDatanodeFailures() throws Exception {
|
||||||
createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH,
|
createImage(new FSTreeWalk(providedPath, conf), nnDirPath,
|
||||||
FixedBlockResolver.class);
|
FixedBlockResolver.class);
|
||||||
startCluster(NNDIRPATH, 3, null,
|
startCluster(nnDirPath, 3, null,
|
||||||
new StorageType[][] {
|
new StorageType[][] {
|
||||||
{StorageType.PROVIDED, StorageType.DISK},
|
{StorageType.PROVIDED, StorageType.DISK},
|
||||||
{StorageType.PROVIDED, StorageType.DISK},
|
{StorageType.PROVIDED, StorageType.DISK},
|
||||||
|
@ -581,23 +574,23 @@ public class TestNameNodeProvidedImplementation {
|
||||||
// 2 locations returned as there are 2 PROVIDED datanodes
|
// 2 locations returned as there are 2 PROVIDED datanodes
|
||||||
DatanodeInfo[] dnInfos =
|
DatanodeInfo[] dnInfos =
|
||||||
getAndCheckBlockLocations(client, filename, baseFileLen, 1, 2);
|
getAndCheckBlockLocations(client, filename, baseFileLen, 1, 2);
|
||||||
//the location should be one of the provided DNs available
|
// the location should be one of the provided DNs available
|
||||||
assertTrue(
|
assertTrue(
|
||||||
dnInfos[0].getDatanodeUuid().equals(
|
dnInfos[0].getDatanodeUuid().equals(
|
||||||
providedDatanode1.getDatanodeUuid())
|
providedDatanode1.getDatanodeUuid())
|
||||||
|| dnInfos[0].getDatanodeUuid().equals(
|
|| dnInfos[0].getDatanodeUuid().equals(
|
||||||
providedDatanode2.getDatanodeUuid()));
|
providedDatanode2.getDatanodeUuid()));
|
||||||
|
|
||||||
//stop the 1st provided datanode
|
// stop the 1st provided datanode
|
||||||
MiniDFSCluster.DataNodeProperties providedDNProperties1 =
|
MiniDFSCluster.DataNodeProperties providedDNProperties1 =
|
||||||
cluster.stopDataNode(0);
|
cluster.stopDataNode(0);
|
||||||
|
|
||||||
//make NameNode detect that datanode is down
|
// make NameNode detect that datanode is down
|
||||||
BlockManagerTestUtil.noticeDeadDatanode(
|
BlockManagerTestUtil.noticeDeadDatanode(
|
||||||
cluster.getNameNode(),
|
cluster.getNameNode(),
|
||||||
providedDatanode1.getDatanodeId().getXferAddr());
|
providedDatanode1.getDatanodeId().getXferAddr());
|
||||||
|
|
||||||
//should find the block on the 2nd provided datanode
|
// should find the block on the 2nd provided datanode
|
||||||
dnInfos = getAndCheckBlockLocations(client, filename, baseFileLen, 1, 1);
|
dnInfos = getAndCheckBlockLocations(client, filename, baseFileLen, 1, 1);
|
||||||
assertEquals(providedDatanode2.getDatanodeUuid(),
|
assertEquals(providedDatanode2.getDatanodeUuid(),
|
||||||
dnInfos[0].getDatanodeUuid());
|
dnInfos[0].getDatanodeUuid());
|
||||||
|
@ -614,15 +607,15 @@ public class TestNameNodeProvidedImplementation {
|
||||||
// BR count for the provided ProvidedDatanodeStorageInfo should reset to
|
// BR count for the provided ProvidedDatanodeStorageInfo should reset to
|
||||||
// 0, when all DNs with PROVIDED storage fail.
|
// 0, when all DNs with PROVIDED storage fail.
|
||||||
assertEquals(0, providedDNInfo.getBlockReportCount());
|
assertEquals(0, providedDNInfo.getBlockReportCount());
|
||||||
//restart the provided datanode
|
// restart the provided datanode
|
||||||
cluster.restartDataNode(providedDNProperties1, true);
|
cluster.restartDataNode(providedDNProperties1, true);
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
|
|
||||||
assertEquals(1, providedDNInfo.getBlockReportCount());
|
assertEquals(1, providedDNInfo.getBlockReportCount());
|
||||||
|
|
||||||
//should find the block on the 1st provided datanode now
|
// should find the block on the 1st provided datanode now
|
||||||
dnInfos = getAndCheckBlockLocations(client, filename, baseFileLen, 1, 1);
|
dnInfos = getAndCheckBlockLocations(client, filename, baseFileLen, 1, 1);
|
||||||
//not comparing UUIDs as the datanode can now have a different one.
|
// not comparing UUIDs as the datanode can now have a different one.
|
||||||
assertEquals(providedDatanode1.getDatanodeId().getXferAddr(),
|
assertEquals(providedDatanode1.getDatanodeId().getXferAddr(),
|
||||||
dnInfos[0].getXferAddr());
|
dnInfos[0].getXferAddr());
|
||||||
}
|
}
|
||||||
|
@ -630,10 +623,10 @@ public class TestNameNodeProvidedImplementation {
|
||||||
|
|
||||||
@Test(timeout=300000)
|
@Test(timeout=300000)
|
||||||
public void testTransientDeadDatanodes() throws Exception {
|
public void testTransientDeadDatanodes() throws Exception {
|
||||||
createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH,
|
createImage(new FSTreeWalk(providedPath, conf), nnDirPath,
|
||||||
FixedBlockResolver.class);
|
FixedBlockResolver.class);
|
||||||
// 3 Datanodes, 2 PROVIDED and other DISK
|
// 3 Datanodes, 2 PROVIDED and other DISK
|
||||||
startCluster(NNDIRPATH, 3, null,
|
startCluster(nnDirPath, 3, null,
|
||||||
new StorageType[][] {
|
new StorageType[][] {
|
||||||
{StorageType.PROVIDED, StorageType.DISK},
|
{StorageType.PROVIDED, StorageType.DISK},
|
||||||
{StorageType.PROVIDED, StorageType.DISK},
|
{StorageType.PROVIDED, StorageType.DISK},
|
||||||
|
@ -668,10 +661,10 @@ public class TestNameNodeProvidedImplementation {
|
||||||
|
|
||||||
@Test(timeout=30000)
|
@Test(timeout=30000)
|
||||||
public void testNamenodeRestart() throws Exception {
|
public void testNamenodeRestart() throws Exception {
|
||||||
createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH,
|
createImage(new FSTreeWalk(providedPath, conf), nnDirPath,
|
||||||
FixedBlockResolver.class);
|
FixedBlockResolver.class);
|
||||||
// 3 Datanodes, 2 PROVIDED and other DISK
|
// 3 Datanodes, 2 PROVIDED and other DISK
|
||||||
startCluster(NNDIRPATH, 3, null,
|
startCluster(nnDirPath, 3, null,
|
||||||
new StorageType[][] {
|
new StorageType[][] {
|
||||||
{StorageType.PROVIDED, StorageType.DISK},
|
{StorageType.PROVIDED, StorageType.DISK},
|
||||||
{StorageType.PROVIDED, StorageType.DISK},
|
{StorageType.PROVIDED, StorageType.DISK},
|
||||||
|
@ -696,7 +689,7 @@ public class TestNameNodeProvidedImplementation {
|
||||||
cluster.getConfiguration(0));
|
cluster.getConfiguration(0));
|
||||||
if (fileIndex < numFiles && fileIndex >= 0) {
|
if (fileIndex < numFiles && fileIndex >= 0) {
|
||||||
String filename = filePrefix + fileIndex + fileSuffix;
|
String filename = filePrefix + fileIndex + fileSuffix;
|
||||||
File file = new File(new Path(NAMEPATH, filename).toUri());
|
File file = new File(new Path(providedPath, filename).toUri());
|
||||||
long fileLen = file.length();
|
long fileLen = file.length();
|
||||||
long blockSize = conf.getLong(FixedBlockResolver.BLOCKSIZE,
|
long blockSize = conf.getLong(FixedBlockResolver.BLOCKSIZE,
|
||||||
FixedBlockResolver.BLOCKSIZE_DEFAULT);
|
FixedBlockResolver.BLOCKSIZE_DEFAULT);
|
||||||
|
@ -710,10 +703,10 @@ public class TestNameNodeProvidedImplementation {
|
||||||
@Test(timeout=30000)
|
@Test(timeout=30000)
|
||||||
public void testSetClusterID() throws Exception {
|
public void testSetClusterID() throws Exception {
|
||||||
String clusterID = "PROVIDED-CLUSTER";
|
String clusterID = "PROVIDED-CLUSTER";
|
||||||
createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH,
|
createImage(new FSTreeWalk(providedPath, conf), nnDirPath,
|
||||||
FixedBlockResolver.class, clusterID, TextFileRegionAliasMap.class);
|
FixedBlockResolver.class, clusterID, TextFileRegionAliasMap.class);
|
||||||
// 2 Datanodes, 1 PROVIDED and other DISK
|
// 2 Datanodes, 1 PROVIDED and other DISK
|
||||||
startCluster(NNDIRPATH, 2, null,
|
startCluster(nnDirPath, 2, null,
|
||||||
new StorageType[][] {
|
new StorageType[][] {
|
||||||
{StorageType.PROVIDED, StorageType.DISK},
|
{StorageType.PROVIDED, StorageType.DISK},
|
||||||
{StorageType.DISK}},
|
{StorageType.DISK}},
|
||||||
|
@ -726,10 +719,10 @@ public class TestNameNodeProvidedImplementation {
|
||||||
public void testNumberOfProvidedLocations() throws Exception {
|
public void testNumberOfProvidedLocations() throws Exception {
|
||||||
// set default replication to 4
|
// set default replication to 4
|
||||||
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 4);
|
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 4);
|
||||||
createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH,
|
createImage(new FSTreeWalk(providedPath, conf), nnDirPath,
|
||||||
FixedBlockResolver.class);
|
FixedBlockResolver.class);
|
||||||
// start with 4 PROVIDED location
|
// start with 4 PROVIDED location
|
||||||
startCluster(NNDIRPATH, 4,
|
startCluster(nnDirPath, 4,
|
||||||
new StorageType[]{
|
new StorageType[]{
|
||||||
StorageType.PROVIDED, StorageType.DISK},
|
StorageType.PROVIDED, StorageType.DISK},
|
||||||
null,
|
null,
|
||||||
|
@ -759,10 +752,10 @@ public class TestNameNodeProvidedImplementation {
|
||||||
conf.setLong(FixedBlockResolver.BLOCKSIZE, baseFileLen/10);
|
conf.setLong(FixedBlockResolver.BLOCKSIZE, baseFileLen/10);
|
||||||
// set default replication to 4
|
// set default replication to 4
|
||||||
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 4);
|
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 4);
|
||||||
createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH,
|
createImage(new FSTreeWalk(providedPath, conf), nnDirPath,
|
||||||
FixedBlockResolver.class);
|
FixedBlockResolver.class);
|
||||||
// start with 4 PROVIDED location
|
// start with 4 PROVIDED location
|
||||||
startCluster(NNDIRPATH, 4,
|
startCluster(nnDirPath, 4,
|
||||||
new StorageType[]{
|
new StorageType[]{
|
||||||
StorageType.PROVIDED, StorageType.DISK},
|
StorageType.PROVIDED, StorageType.DISK},
|
||||||
null,
|
null,
|
||||||
|
@ -795,15 +788,15 @@ public class TestNameNodeProvidedImplementation {
|
||||||
levelDBAliasMapServer.setConf(conf);
|
levelDBAliasMapServer.setConf(conf);
|
||||||
levelDBAliasMapServer.start();
|
levelDBAliasMapServer.start();
|
||||||
|
|
||||||
createImage(new FSTreeWalk(NAMEPATH, conf),
|
createImage(new FSTreeWalk(providedPath, conf),
|
||||||
NNDIRPATH,
|
nnDirPath,
|
||||||
FixedBlockResolver.class, "",
|
FixedBlockResolver.class, "",
|
||||||
InMemoryLevelDBAliasMapClient.class);
|
InMemoryLevelDBAliasMapClient.class);
|
||||||
levelDBAliasMapServer.close();
|
levelDBAliasMapServer.close();
|
||||||
|
|
||||||
// start cluster with two datanodes,
|
// start cluster with two datanodes,
|
||||||
// each with 1 PROVIDED volume and other DISK volume
|
// each with 1 PROVIDED volume and other DISK volume
|
||||||
startCluster(NNDIRPATH, 2,
|
startCluster(nnDirPath, 2,
|
||||||
new StorageType[] {StorageType.PROVIDED, StorageType.DISK},
|
new StorageType[] {StorageType.PROVIDED, StorageType.DISK},
|
||||||
null, false);
|
null, false);
|
||||||
verifyFileSystemContents();
|
verifyFileSystemContents();
|
||||||
|
@ -841,9 +834,9 @@ public class TestNameNodeProvidedImplementation {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testDatanodeLifeCycle() throws Exception {
|
public void testDatanodeLifeCycle() throws Exception {
|
||||||
createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH,
|
createImage(new FSTreeWalk(providedPath, conf), nnDirPath,
|
||||||
FixedBlockResolver.class);
|
FixedBlockResolver.class);
|
||||||
startCluster(NNDIRPATH, 3,
|
startCluster(nnDirPath, 3,
|
||||||
new StorageType[] {StorageType.PROVIDED, StorageType.DISK},
|
new StorageType[] {StorageType.PROVIDED, StorageType.DISK},
|
||||||
null, false);
|
null, false);
|
||||||
|
|
||||||
|
@ -915,7 +908,7 @@ public class TestNameNodeProvidedImplementation {
|
||||||
"BlockPlacementPolicyRackFaultTolerant",
|
"BlockPlacementPolicyRackFaultTolerant",
|
||||||
"BlockPlacementPolicyWithNodeGroup",
|
"BlockPlacementPolicyWithNodeGroup",
|
||||||
"BlockPlacementPolicyWithUpgradeDomain"};
|
"BlockPlacementPolicyWithUpgradeDomain"};
|
||||||
createImage(new FSTreeWalk(NAMEPATH, conf), NNDIRPATH,
|
createImage(new FSTreeWalk(providedPath, conf), nnDirPath,
|
||||||
FixedBlockResolver.class);
|
FixedBlockResolver.class);
|
||||||
String[] racks =
|
String[] racks =
|
||||||
{"/pod0/rack0", "/pod0/rack0", "/pod0/rack1", "/pod0/rack1",
|
{"/pod0/rack0", "/pod0/rack0", "/pod0/rack1", "/pod0/rack1",
|
||||||
|
@ -923,7 +916,7 @@ public class TestNameNodeProvidedImplementation {
|
||||||
for (String policy: policies) {
|
for (String policy: policies) {
|
||||||
LOG.info("Using policy: " + packageName + "." + policy);
|
LOG.info("Using policy: " + packageName + "." + policy);
|
||||||
conf.set(DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, packageName + "." + policy);
|
conf.set(DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, packageName + "." + policy);
|
||||||
startCluster(NNDIRPATH, racks.length,
|
startCluster(nnDirPath, racks.length,
|
||||||
new StorageType[]{StorageType.PROVIDED, StorageType.DISK},
|
new StorageType[]{StorageType.PROVIDED, StorageType.DISK},
|
||||||
null, false, racks);
|
null, false, racks);
|
||||||
verifyFileSystemContents();
|
verifyFileSystemContents();
|
|
@ -17,7 +17,6 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.namenode;
|
package org.apache.hadoop.hdfs.server.namenode;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
@ -43,7 +42,6 @@ public class RandomTreeWalk extends TreeWalk {
|
||||||
private final float depth;
|
private final float depth;
|
||||||
private final int children;
|
private final int children;
|
||||||
private final Map<Long, Long> mSeed;
|
private final Map<Long, Long> mSeed;
|
||||||
//private final AtomicLong blockIds = new AtomicLong(1L << 30);
|
|
||||||
|
|
||||||
RandomTreeWalk(long seed) {
|
RandomTreeWalk(long seed) {
|
||||||
this(seed, 10);
|
this(seed, 10);
|
||||||
|
@ -54,7 +52,7 @@ public class RandomTreeWalk extends TreeWalk {
|
||||||
}
|
}
|
||||||
|
|
||||||
RandomTreeWalk(long seed, int children, float depth) {
|
RandomTreeWalk(long seed, int children, float depth) {
|
||||||
this(randomRoot(seed), seed, children, 0.15f);
|
this(randomRoot(seed), seed, children, depth);
|
||||||
}
|
}
|
||||||
|
|
||||||
RandomTreeWalk(Path root, long seed, int children, float depth) {
|
RandomTreeWalk(Path root, long seed, int children, float depth) {
|
||||||
|
|
Loading…
Reference in New Issue