HDFS-7467. Provide storage tier information for a directory via fsck. (Benoy Antony)
This commit is contained in:
parent
d389a1ae98
commit
4e400030f6
|
@ -25,6 +25,7 @@ import java.net.InetAddress;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.net.Socket;
|
import java.net.Socket;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
|
@ -45,6 +46,7 @@ import org.apache.hadoop.hdfs.DFSClient;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
import org.apache.hadoop.hdfs.RemotePeerFactory;
|
import org.apache.hadoop.hdfs.RemotePeerFactory;
|
||||||
|
import org.apache.hadoop.fs.StorageType;
|
||||||
import org.apache.hadoop.hdfs.net.Peer;
|
import org.apache.hadoop.hdfs.net.Peer;
|
||||||
import org.apache.hadoop.hdfs.net.TcpPeerServer;
|
import org.apache.hadoop.hdfs.net.TcpPeerServer;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
|
@ -127,6 +129,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
|
||||||
private boolean showBlocks = false;
|
private boolean showBlocks = false;
|
||||||
private boolean showLocations = false;
|
private boolean showLocations = false;
|
||||||
private boolean showRacks = false;
|
private boolean showRacks = false;
|
||||||
|
private boolean showStoragePolcies = false;
|
||||||
private boolean showCorruptFileBlocks = false;
|
private boolean showCorruptFileBlocks = false;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -163,6 +166,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
|
||||||
private List<String> snapshottableDirs = null;
|
private List<String> snapshottableDirs = null;
|
||||||
|
|
||||||
private final BlockPlacementPolicy bpPolicy;
|
private final BlockPlacementPolicy bpPolicy;
|
||||||
|
private StoragePolicySummary storageTypeSummary = null;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Filesystem checker.
|
* Filesystem checker.
|
||||||
|
@ -199,6 +203,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
|
||||||
else if (key.equals("blocks")) { this.showBlocks = true; }
|
else if (key.equals("blocks")) { this.showBlocks = true; }
|
||||||
else if (key.equals("locations")) { this.showLocations = true; }
|
else if (key.equals("locations")) { this.showLocations = true; }
|
||||||
else if (key.equals("racks")) { this.showRacks = true; }
|
else if (key.equals("racks")) { this.showRacks = true; }
|
||||||
|
else if (key.equals("storagepolicies")) { this.showStoragePolcies = true; }
|
||||||
else if (key.equals("openforwrite")) {this.showOpenFiles = true; }
|
else if (key.equals("openforwrite")) {this.showOpenFiles = true; }
|
||||||
else if (key.equals("listcorruptfileblocks")) {
|
else if (key.equals("listcorruptfileblocks")) {
|
||||||
this.showCorruptFileBlocks = true;
|
this.showCorruptFileBlocks = true;
|
||||||
|
@ -332,6 +337,11 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (this.showStoragePolcies) {
|
||||||
|
storageTypeSummary = new StoragePolicySummary(
|
||||||
|
namenode.getNamesystem().getBlockManager().getStoragePolicies());
|
||||||
|
}
|
||||||
|
|
||||||
Result res = new Result(conf);
|
Result res = new Result(conf);
|
||||||
|
|
||||||
check(path, file, res);
|
check(path, file, res);
|
||||||
|
@ -340,6 +350,10 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
|
||||||
out.println(" Number of data-nodes:\t\t" + totalDatanodes);
|
out.println(" Number of data-nodes:\t\t" + totalDatanodes);
|
||||||
out.println(" Number of racks:\t\t" + networktopology.getNumOfRacks());
|
out.println(" Number of racks:\t\t" + networktopology.getNumOfRacks());
|
||||||
|
|
||||||
|
if (this.showStoragePolcies) {
|
||||||
|
out.print(storageTypeSummary.toString());
|
||||||
|
}
|
||||||
|
|
||||||
out.println("FSCK ended at " + new Date() + " in "
|
out.println("FSCK ended at " + new Date() + " in "
|
||||||
+ (Time.now() - startTime + " milliseconds"));
|
+ (Time.now() - startTime + " milliseconds"));
|
||||||
|
|
||||||
|
@ -487,7 +501,8 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
|
||||||
boolean isCorrupt = lBlk.isCorrupt();
|
boolean isCorrupt = lBlk.isCorrupt();
|
||||||
String blkName = block.toString();
|
String blkName = block.toString();
|
||||||
DatanodeInfo[] locs = lBlk.getLocations();
|
DatanodeInfo[] locs = lBlk.getLocations();
|
||||||
NumberReplicas numberReplicas = namenode.getNamesystem().getBlockManager().countNodes(block.getLocalBlock());
|
NumberReplicas numberReplicas =
|
||||||
|
namenode.getNamesystem().getBlockManager().countNodes(block.getLocalBlock());
|
||||||
int liveReplicas = numberReplicas.liveReplicas();
|
int liveReplicas = numberReplicas.liveReplicas();
|
||||||
res.totalReplicas += liveReplicas;
|
res.totalReplicas += liveReplicas;
|
||||||
short targetFileReplication = file.getReplication();
|
short targetFileReplication = file.getReplication();
|
||||||
|
@ -496,6 +511,12 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
|
||||||
res.excessiveReplicas += (liveReplicas - targetFileReplication);
|
res.excessiveReplicas += (liveReplicas - targetFileReplication);
|
||||||
res.numOverReplicatedBlocks += 1;
|
res.numOverReplicatedBlocks += 1;
|
||||||
}
|
}
|
||||||
|
//keep track of storage tier counts
|
||||||
|
if (this.showStoragePolcies && lBlk.getStorageTypes() != null) {
|
||||||
|
StorageType[] storageTypes = lBlk.getStorageTypes();
|
||||||
|
storageTypeSummary.add(Arrays.copyOf(storageTypes, storageTypes.length),
|
||||||
|
fsn.getBlockManager().getStoragePolicy(file.getStoragePolicy()));
|
||||||
|
}
|
||||||
// Check if block is Corrupt
|
// Check if block is Corrupt
|
||||||
if (isCorrupt) {
|
if (isCorrupt) {
|
||||||
corrupt++;
|
corrupt++;
|
||||||
|
|
|
@ -0,0 +1,257 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hdfs.server.namenode;
|
||||||
|
|
||||||
|
import java.text.NumberFormat;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Comparator;
|
||||||
|
import java.util.EnumMap;
|
||||||
|
import java.util.Formatter;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.LinkedList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Map.Entry;
|
||||||
|
import java.util.Objects;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.StorageType;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Aggregate the storage type information for a set of blocks
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public class StoragePolicySummary {
|
||||||
|
|
||||||
|
Map<StorageTypeAllocation, Long> storageComboCounts = new HashMap<>();
|
||||||
|
final BlockStoragePolicy[] storagePolicies;
|
||||||
|
int totalBlocks;
|
||||||
|
|
||||||
|
StoragePolicySummary(BlockStoragePolicy[] storagePolicies) {
|
||||||
|
this.storagePolicies = storagePolicies;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add a storage type combination
|
||||||
|
void add(StorageType[] storageTypes, BlockStoragePolicy policy) {
|
||||||
|
StorageTypeAllocation storageCombo =
|
||||||
|
new StorageTypeAllocation(storageTypes, policy);
|
||||||
|
Long count = storageComboCounts.get(storageCombo);
|
||||||
|
if (count == null) {
|
||||||
|
storageComboCounts.put(storageCombo, 1l);
|
||||||
|
storageCombo.setActualStoragePolicy(
|
||||||
|
getStoragePolicy(storageCombo.getStorageTypes()));
|
||||||
|
} else {
|
||||||
|
storageComboCounts.put(storageCombo, count.longValue()+1);
|
||||||
|
}
|
||||||
|
totalBlocks++;
|
||||||
|
}
|
||||||
|
|
||||||
|
// sort the storageType combinations based on the total blocks counts
|
||||||
|
// in descending order
|
||||||
|
static List<Entry<StorageTypeAllocation, Long>> sortByComparator(
|
||||||
|
Map<StorageTypeAllocation, Long> unsortMap) {
|
||||||
|
List<Entry<StorageTypeAllocation, Long>> storageAllocations =
|
||||||
|
new LinkedList<>(unsortMap.entrySet());
|
||||||
|
// Sorting the list based on values
|
||||||
|
Collections.sort(storageAllocations,
|
||||||
|
new Comparator<Entry<StorageTypeAllocation, Long>>() {
|
||||||
|
public int compare(Entry<StorageTypeAllocation, Long> o1,
|
||||||
|
Entry<StorageTypeAllocation, Long> o2)
|
||||||
|
{
|
||||||
|
return o2.getValue().compareTo(o1.getValue());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
return storageAllocations;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String toString() {
|
||||||
|
StringBuilder compliantBlocksSB = new StringBuilder();
|
||||||
|
compliantBlocksSB.append("\nBlocks satisfying the specified storage policy:");
|
||||||
|
compliantBlocksSB.append("\nStorage Policy # of blocks % of blocks\n");
|
||||||
|
StringBuilder nonCompliantBlocksSB = new StringBuilder();
|
||||||
|
Formatter compliantFormatter = new Formatter(compliantBlocksSB);
|
||||||
|
Formatter nonCompliantFormatter = new Formatter(nonCompliantBlocksSB);
|
||||||
|
NumberFormat percentFormat = NumberFormat.getPercentInstance();
|
||||||
|
percentFormat.setMinimumFractionDigits(4);
|
||||||
|
percentFormat.setMaximumFractionDigits(4);
|
||||||
|
for (Map.Entry<StorageTypeAllocation, Long> storageComboCount:
|
||||||
|
sortByComparator(storageComboCounts)) {
|
||||||
|
double percent = (double) storageComboCount.getValue() /
|
||||||
|
(double) totalBlocks;
|
||||||
|
StorageTypeAllocation sta = storageComboCount.getKey();
|
||||||
|
if (sta.policyMatches()) {
|
||||||
|
compliantFormatter.format("%-25s %10d %20s%n",
|
||||||
|
sta.getStoragePolicyDescriptor(),
|
||||||
|
storageComboCount.getValue(),
|
||||||
|
percentFormat.format(percent));
|
||||||
|
} else {
|
||||||
|
if (nonCompliantBlocksSB.length() == 0) {
|
||||||
|
nonCompliantBlocksSB.append("\nBlocks NOT satisfying the specified storage policy:");
|
||||||
|
nonCompliantBlocksSB.append("\nStorage Policy ");
|
||||||
|
nonCompliantBlocksSB.append(
|
||||||
|
"Specified Storage Policy # of blocks % of blocks\n");
|
||||||
|
}
|
||||||
|
nonCompliantFormatter.format("%-35s %-20s %10d %20s%n",
|
||||||
|
sta.getStoragePolicyDescriptor(),
|
||||||
|
sta.getSpecifiedStoragePolicy().getName(),
|
||||||
|
storageComboCount.getValue(),
|
||||||
|
percentFormat.format(percent));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (nonCompliantBlocksSB.length() == 0) {
|
||||||
|
nonCompliantBlocksSB.append("\nAll blocks satisfy specified storage policy.\n");
|
||||||
|
}
|
||||||
|
compliantFormatter.close();
|
||||||
|
nonCompliantFormatter.close();
|
||||||
|
return compliantBlocksSB.toString() + nonCompliantBlocksSB;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* @param storageTypes - sorted array of storageTypes
|
||||||
|
* @return Storage Policy which matches the specific storage Combination
|
||||||
|
*/
|
||||||
|
private BlockStoragePolicy getStoragePolicy(StorageType[] storageTypes) {
|
||||||
|
for (BlockStoragePolicy storagePolicy:storagePolicies) {
|
||||||
|
StorageType[] policyStorageTypes = storagePolicy.getStorageTypes();
|
||||||
|
policyStorageTypes = Arrays.copyOf(policyStorageTypes, policyStorageTypes.length);
|
||||||
|
Arrays.sort(policyStorageTypes);
|
||||||
|
if (policyStorageTypes.length <= storageTypes.length) {
|
||||||
|
int i = 0;
|
||||||
|
for (; i < policyStorageTypes.length; i++) {
|
||||||
|
if (policyStorageTypes[i] != storageTypes[i]) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (i < policyStorageTypes.length) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
int j=policyStorageTypes.length;
|
||||||
|
for (; j < storageTypes.length; j++) {
|
||||||
|
if (policyStorageTypes[i-1] != storageTypes[j]) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (j==storageTypes.length) {
|
||||||
|
return storagePolicy;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Internal class which represents a unique Storage type combination
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
static class StorageTypeAllocation {
|
||||||
|
private final BlockStoragePolicy specifiedStoragePolicy;
|
||||||
|
private final StorageType[] storageTypes;
|
||||||
|
private BlockStoragePolicy actualStoragePolicy;
|
||||||
|
|
||||||
|
StorageTypeAllocation(StorageType[] storageTypes,
|
||||||
|
BlockStoragePolicy specifiedStoragePolicy) {
|
||||||
|
Arrays.sort(storageTypes);
|
||||||
|
this.storageTypes = storageTypes;
|
||||||
|
this.specifiedStoragePolicy = specifiedStoragePolicy;
|
||||||
|
}
|
||||||
|
|
||||||
|
StorageType[] getStorageTypes() {
|
||||||
|
return storageTypes;
|
||||||
|
}
|
||||||
|
|
||||||
|
BlockStoragePolicy getSpecifiedStoragePolicy() {
|
||||||
|
return specifiedStoragePolicy;
|
||||||
|
}
|
||||||
|
|
||||||
|
void setActualStoragePolicy(BlockStoragePolicy actualStoragePolicy) {
|
||||||
|
this.actualStoragePolicy = actualStoragePolicy;
|
||||||
|
}
|
||||||
|
|
||||||
|
BlockStoragePolicy getActualStoragePolicy() {
|
||||||
|
return actualStoragePolicy;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static String getStorageAllocationAsString
|
||||||
|
(Map<StorageType, Integer> storageType_countmap) {
|
||||||
|
StringBuilder sb = new StringBuilder();
|
||||||
|
for (Map.Entry<StorageType, Integer>
|
||||||
|
storageTypeCountEntry:storageType_countmap.entrySet()) {
|
||||||
|
sb.append(storageTypeCountEntry.getKey().name()+ ":"
|
||||||
|
+ storageTypeCountEntry.getValue() + ",");
|
||||||
|
}
|
||||||
|
if (sb.length() > 1) {
|
||||||
|
sb.deleteCharAt(sb.length()-1);
|
||||||
|
}
|
||||||
|
return sb.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getStorageAllocationAsString() {
|
||||||
|
Map<StorageType, Integer> storageType_countmap =
|
||||||
|
new EnumMap<>(StorageType.class);
|
||||||
|
for (StorageType storageType: storageTypes) {
|
||||||
|
Integer count = storageType_countmap.get(storageType);
|
||||||
|
if (count == null) {
|
||||||
|
storageType_countmap.put(storageType, 1);
|
||||||
|
} else {
|
||||||
|
storageType_countmap.put(storageType, count.intValue()+1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return (getStorageAllocationAsString(storageType_countmap));
|
||||||
|
}
|
||||||
|
|
||||||
|
String getStoragePolicyDescriptor() {
|
||||||
|
StringBuilder storagePolicyDescriptorSB = new StringBuilder();
|
||||||
|
if (actualStoragePolicy!=null) {
|
||||||
|
storagePolicyDescriptorSB.append(getStorageAllocationAsString())
|
||||||
|
.append("(")
|
||||||
|
.append(actualStoragePolicy.getName())
|
||||||
|
.append(")");
|
||||||
|
} else {
|
||||||
|
storagePolicyDescriptorSB.append(getStorageAllocationAsString());
|
||||||
|
}
|
||||||
|
return storagePolicyDescriptorSB.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean policyMatches() {
|
||||||
|
return specifiedStoragePolicy.equals(actualStoragePolicy);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return specifiedStoragePolicy.getName() + "|" + getStoragePolicyDescriptor();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return Objects.hash(specifiedStoragePolicy,Arrays.hashCode(storageTypes));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object another) {
|
||||||
|
return (another instanceof StorageTypeAllocation &&
|
||||||
|
Objects.equals(specifiedStoragePolicy,
|
||||||
|
((StorageTypeAllocation)another).specifiedStoragePolicy) &&
|
||||||
|
Arrays.equals(storageTypes,
|
||||||
|
((StorageTypeAllocation)another).storageTypes));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -91,7 +91,8 @@ public class DFSck extends Configured implements Tool {
|
||||||
+ "blocks and files they belong to\n"
|
+ "blocks and files they belong to\n"
|
||||||
+ "\t-blocks\tprint out block report\n"
|
+ "\t-blocks\tprint out block report\n"
|
||||||
+ "\t-locations\tprint out locations for every block\n"
|
+ "\t-locations\tprint out locations for every block\n"
|
||||||
+ "\t-racks\tprint out network topology for data-node locations\n\n"
|
+ "\t-racks\tprint out network topology for data-node locations\n"
|
||||||
|
+ "\t-storagepolicies\tprint out storage policy summary for the blocks\n\n"
|
||||||
+ "\t-blockId\tprint out which file this blockId belongs to, locations"
|
+ "\t-blockId\tprint out which file this blockId belongs to, locations"
|
||||||
+ " (nodes, racks) of this block, and other diagnostics info"
|
+ " (nodes, racks) of this block, and other diagnostics info"
|
||||||
+ " (under replicated, corrupted or not, etc)\n\n"
|
+ " (under replicated, corrupted or not, etc)\n\n"
|
||||||
|
@ -264,6 +265,7 @@ public class DFSck extends Configured implements Tool {
|
||||||
else if (args[idx].equals("-blocks")) { url.append("&blocks=1"); }
|
else if (args[idx].equals("-blocks")) { url.append("&blocks=1"); }
|
||||||
else if (args[idx].equals("-locations")) { url.append("&locations=1"); }
|
else if (args[idx].equals("-locations")) { url.append("&locations=1"); }
|
||||||
else if (args[idx].equals("-racks")) { url.append("&racks=1"); }
|
else if (args[idx].equals("-racks")) { url.append("&racks=1"); }
|
||||||
|
else if (args[idx].equals("-storagepolicies")) { url.append("&storagepolicies=1"); }
|
||||||
else if (args[idx].equals("-list-corruptfileblocks")) {
|
else if (args[idx].equals("-list-corruptfileblocks")) {
|
||||||
url.append("&listcorruptfileblocks=1");
|
url.append("&listcorruptfileblocks=1");
|
||||||
doListCorruptFileBlocks = true;
|
doListCorruptFileBlocks = true;
|
||||||
|
|
|
@ -18,6 +18,18 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hdfs.server.namenode;
|
package org.apache.hadoop.hdfs.server.namenode;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
import static org.mockito.Matchers.anyBoolean;
|
||||||
|
import static org.mockito.Matchers.anyLong;
|
||||||
|
import static org.mockito.Matchers.anyString;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
import java.io.BufferedReader;
|
import java.io.BufferedReader;
|
||||||
import java.io.ByteArrayOutputStream;
|
import java.io.ByteArrayOutputStream;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
@ -41,7 +53,6 @@ import java.util.Set;
|
||||||
import java.util.regex.Matcher;
|
import java.util.regex.Matcher;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
import com.google.common.collect.Sets;
|
|
||||||
import org.apache.commons.logging.impl.Log4JLogger;
|
import org.apache.commons.logging.impl.Log4JLogger;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
@ -58,6 +69,7 @@ import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.apache.hadoop.fs.StorageType;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
|
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
|
@ -83,17 +95,7 @@ import org.apache.log4j.PatternLayout;
|
||||||
import org.apache.log4j.RollingFileAppender;
|
import org.apache.log4j.RollingFileAppender;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import com.google.common.collect.Sets;
|
||||||
import static org.junit.Assert.assertFalse;
|
|
||||||
import static org.junit.Assert.assertNotNull;
|
|
||||||
import static org.junit.Assert.assertNull;
|
|
||||||
import static org.junit.Assert.assertTrue;
|
|
||||||
import static org.junit.Assert.fail;
|
|
||||||
import static org.mockito.Matchers.anyBoolean;
|
|
||||||
import static org.mockito.Matchers.anyLong;
|
|
||||||
import static org.mockito.Matchers.anyString;
|
|
||||||
import static org.mockito.Mockito.mock;
|
|
||||||
import static org.mockito.Mockito.when;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A JUnit test for doing fsck
|
* A JUnit test for doing fsck
|
||||||
|
@ -1326,4 +1328,56 @@ public class TestFsck {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void writeFile(final DistributedFileSystem dfs,
|
||||||
|
Path dir, String fileName) throws IOException {
|
||||||
|
Path filePath = new Path(dir.toString() + Path.SEPARATOR + fileName);
|
||||||
|
final FSDataOutputStream out = dfs.create(filePath);
|
||||||
|
out.writeChars("teststring");
|
||||||
|
out.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void writeFile(final DistributedFileSystem dfs,
|
||||||
|
String dirName, String fileName, String StoragePolicy) throws IOException {
|
||||||
|
Path dirPath = new Path(dirName);
|
||||||
|
dfs.mkdirs(dirPath);
|
||||||
|
dfs.setStoragePolicy(dirPath, StoragePolicy);
|
||||||
|
writeFile(dfs, dirPath, fileName);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test storage policy display
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testStoragePoliciesCK() throws Exception {
|
||||||
|
final Configuration conf = new HdfsConfiguration();
|
||||||
|
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
||||||
|
.numDataNodes(3)
|
||||||
|
.storageTypes(
|
||||||
|
new StorageType[] {StorageType.DISK, StorageType.ARCHIVE})
|
||||||
|
.build();
|
||||||
|
try {
|
||||||
|
cluster.waitActive();
|
||||||
|
final DistributedFileSystem dfs = cluster.getFileSystem();
|
||||||
|
writeFile(dfs, "/testhot", "file", "HOT");
|
||||||
|
writeFile(dfs, "/testwarm", "file", "WARM");
|
||||||
|
writeFile(dfs, "/testcold", "file", "COLD");
|
||||||
|
String outStr = runFsck(conf, 0, true, "/", "-storagepolicies");
|
||||||
|
assertTrue(outStr.contains("DISK:3(HOT)"));
|
||||||
|
assertTrue(outStr.contains("DISK:1,ARCHIVE:2(WARM)"));
|
||||||
|
assertTrue(outStr.contains("ARCHIVE:3(COLD)"));
|
||||||
|
assertTrue(outStr.contains("All blocks satisfy specified storage policy."));
|
||||||
|
dfs.setStoragePolicy(new Path("/testhot"), "COLD");
|
||||||
|
dfs.setStoragePolicy(new Path("/testwarm"), "COLD");
|
||||||
|
outStr = runFsck(conf, 0, true, "/", "-storagepolicies");
|
||||||
|
assertTrue(outStr.contains("DISK:3(HOT)"));
|
||||||
|
assertTrue(outStr.contains("DISK:1,ARCHIVE:2(WARM)"));
|
||||||
|
assertTrue(outStr.contains("ARCHIVE:3(COLD)"));
|
||||||
|
assertFalse(outStr.contains("All blocks satisfy specified storage policy."));
|
||||||
|
} finally {
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,201 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.namenode;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.LinkedHashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.StorageType;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.StoragePolicySummary.StorageTypeAllocation;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class TestStoragePolicySummary {
|
||||||
|
|
||||||
|
private Map<String, Long> convertToStringMap(StoragePolicySummary sts) {
|
||||||
|
LinkedHashMap<String, Long> actualOutput = new LinkedHashMap<>();
|
||||||
|
for (Map.Entry<StorageTypeAllocation, Long> entry:
|
||||||
|
StoragePolicySummary.sortByComparator(sts.storageComboCounts)) {
|
||||||
|
actualOutput.put(entry.getKey().toString(), entry.getValue());
|
||||||
|
}
|
||||||
|
return actualOutput;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMultipleHots() {
|
||||||
|
BlockStoragePolicySuite bsps = BlockStoragePolicySuite.createDefaultSuite();
|
||||||
|
StoragePolicySummary sts = new StoragePolicySummary(bsps.getAllPolicies());
|
||||||
|
BlockStoragePolicy hot = bsps.getPolicy("HOT");
|
||||||
|
sts.add(new StorageType[]{StorageType.DISK},hot);
|
||||||
|
sts.add(new StorageType[]{StorageType.DISK,StorageType.DISK},hot);
|
||||||
|
sts.add(new StorageType[]{StorageType.DISK,
|
||||||
|
StorageType.DISK,StorageType.DISK},hot);
|
||||||
|
sts.add(new StorageType[]{StorageType.DISK,
|
||||||
|
StorageType.DISK,StorageType.DISK,StorageType.DISK},hot);
|
||||||
|
Map<String, Long> actualOutput = convertToStringMap(sts);
|
||||||
|
Assert.assertEquals(4,actualOutput.size());
|
||||||
|
Map<String, Long> expectedOutput = new HashMap<>();
|
||||||
|
expectedOutput.put("HOT|DISK:1(HOT)", 1l);
|
||||||
|
expectedOutput.put("HOT|DISK:2(HOT)", 1l);
|
||||||
|
expectedOutput.put("HOT|DISK:3(HOT)", 1l);
|
||||||
|
expectedOutput.put("HOT|DISK:4(HOT)", 1l);
|
||||||
|
Assert.assertEquals(expectedOutput,actualOutput);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMultipleHotsWithDifferentCounts() {
|
||||||
|
BlockStoragePolicySuite bsps = BlockStoragePolicySuite.createDefaultSuite();
|
||||||
|
StoragePolicySummary sts = new StoragePolicySummary(bsps.getAllPolicies());
|
||||||
|
BlockStoragePolicy hot = bsps.getPolicy("HOT");
|
||||||
|
sts.add(new StorageType[]{StorageType.DISK},hot);
|
||||||
|
sts.add(new StorageType[]{StorageType.DISK,StorageType.DISK},hot);
|
||||||
|
sts.add(new StorageType[]{StorageType.DISK,StorageType.DISK},hot);
|
||||||
|
sts.add(new StorageType[]{StorageType.DISK,
|
||||||
|
StorageType.DISK,StorageType.DISK},hot);
|
||||||
|
sts.add(new StorageType[]{StorageType.DISK,
|
||||||
|
StorageType.DISK,StorageType.DISK},hot);
|
||||||
|
sts.add(new StorageType[]{StorageType.DISK,
|
||||||
|
StorageType.DISK,StorageType.DISK,StorageType.DISK},hot);
|
||||||
|
Map<String, Long> actualOutput = convertToStringMap(sts);
|
||||||
|
Assert.assertEquals(4,actualOutput.size());
|
||||||
|
Map<String, Long> expectedOutput = new HashMap<>();
|
||||||
|
expectedOutput.put("HOT|DISK:1(HOT)", 1l);
|
||||||
|
expectedOutput.put("HOT|DISK:2(HOT)", 2l);
|
||||||
|
expectedOutput.put("HOT|DISK:3(HOT)", 2l);
|
||||||
|
expectedOutput.put("HOT|DISK:4(HOT)", 1l);
|
||||||
|
Assert.assertEquals(expectedOutput,actualOutput);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMultipleWarmsInDifferentOrder() {
|
||||||
|
BlockStoragePolicySuite bsps = BlockStoragePolicySuite.createDefaultSuite();
|
||||||
|
StoragePolicySummary sts = new StoragePolicySummary(bsps.getAllPolicies());
|
||||||
|
BlockStoragePolicy warm = bsps.getPolicy("WARM");
|
||||||
|
//DISK:1,ARCHIVE:1
|
||||||
|
sts.add(new StorageType[]{StorageType.DISK,StorageType.ARCHIVE},warm);
|
||||||
|
sts.add(new StorageType[]{StorageType.ARCHIVE,StorageType.DISK},warm);
|
||||||
|
//DISK:2,ARCHIVE:1
|
||||||
|
sts.add(new StorageType[]{StorageType.ARCHIVE,
|
||||||
|
StorageType.DISK,StorageType.DISK},warm);
|
||||||
|
sts.add(new StorageType[]{StorageType.DISK,
|
||||||
|
StorageType.ARCHIVE,StorageType.DISK},warm);
|
||||||
|
sts.add(new StorageType[]{StorageType.DISK,
|
||||||
|
StorageType.DISK,StorageType.ARCHIVE},warm);
|
||||||
|
//DISK:1,ARCHIVE:2
|
||||||
|
sts.add(new StorageType[]{StorageType.DISK,
|
||||||
|
StorageType.ARCHIVE,StorageType.ARCHIVE},warm);
|
||||||
|
sts.add(new StorageType[]{StorageType.ARCHIVE,
|
||||||
|
StorageType.DISK,StorageType.ARCHIVE},warm);
|
||||||
|
sts.add(new StorageType[]{StorageType.ARCHIVE,
|
||||||
|
StorageType.ARCHIVE,StorageType.DISK},warm);
|
||||||
|
//DISK:2,ARCHIVE:2
|
||||||
|
sts.add(new StorageType[]{StorageType.ARCHIVE,
|
||||||
|
StorageType.ARCHIVE,StorageType.DISK,StorageType.DISK},warm);
|
||||||
|
Map<String, Long> actualOutput = convertToStringMap(sts);
|
||||||
|
Assert.assertEquals(4,actualOutput.size());
|
||||||
|
Map<String, Long> expectedOutput = new HashMap<>();
|
||||||
|
expectedOutput.put("WARM|DISK:1,ARCHIVE:1(WARM)", 2l);
|
||||||
|
expectedOutput.put("WARM|DISK:2,ARCHIVE:1", 3l);
|
||||||
|
expectedOutput.put("WARM|DISK:1,ARCHIVE:2(WARM)", 3l);
|
||||||
|
expectedOutput.put("WARM|DISK:2,ARCHIVE:2", 1l);
|
||||||
|
Assert.assertEquals(expectedOutput,actualOutput);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDifferentSpecifiedPolicies() {
|
||||||
|
BlockStoragePolicySuite bsps = BlockStoragePolicySuite.createDefaultSuite();
|
||||||
|
StoragePolicySummary sts = new StoragePolicySummary(bsps.getAllPolicies());
|
||||||
|
BlockStoragePolicy hot = bsps.getPolicy("HOT");
|
||||||
|
BlockStoragePolicy warm = bsps.getPolicy("WARM");
|
||||||
|
BlockStoragePolicy cold = bsps.getPolicy("COLD");
|
||||||
|
//DISK:3
|
||||||
|
sts.add(new StorageType[]{StorageType.DISK,StorageType.DISK,StorageType.DISK},hot);
|
||||||
|
sts.add(new StorageType[]{StorageType.DISK,StorageType.DISK,StorageType.DISK},hot);
|
||||||
|
sts.add(new StorageType[]{StorageType.DISK,StorageType.DISK,StorageType.DISK},warm);
|
||||||
|
sts.add(new StorageType[]{StorageType.DISK,StorageType.DISK,StorageType.DISK},cold);
|
||||||
|
//DISK:1,ARCHIVE:2
|
||||||
|
sts.add(new StorageType[]{StorageType.DISK,
|
||||||
|
StorageType.ARCHIVE,StorageType.ARCHIVE},hot);
|
||||||
|
sts.add(new StorageType[]{StorageType.ARCHIVE,
|
||||||
|
StorageType.DISK,StorageType.ARCHIVE},warm);
|
||||||
|
sts.add(new StorageType[]{StorageType.ARCHIVE,
|
||||||
|
StorageType.ARCHIVE,StorageType.DISK},cold);
|
||||||
|
sts.add(new StorageType[]{StorageType.ARCHIVE,
|
||||||
|
StorageType.ARCHIVE,StorageType.DISK},cold);
|
||||||
|
//ARCHIVE:3
|
||||||
|
sts.add(new StorageType[]{StorageType.ARCHIVE,
|
||||||
|
StorageType.ARCHIVE,StorageType.ARCHIVE},hot);
|
||||||
|
sts.add(new StorageType[]{StorageType.ARCHIVE,
|
||||||
|
StorageType.ARCHIVE,StorageType.ARCHIVE},hot);
|
||||||
|
sts.add(new StorageType[]{StorageType.ARCHIVE,
|
||||||
|
StorageType.ARCHIVE,StorageType.ARCHIVE},warm);
|
||||||
|
sts.add(new StorageType[]{StorageType.ARCHIVE,
|
||||||
|
StorageType.ARCHIVE,StorageType.ARCHIVE},cold);
|
||||||
|
Map<String, Long> actualOutput = convertToStringMap(sts);
|
||||||
|
Assert.assertEquals(9,actualOutput.size());
|
||||||
|
Map<String, Long> expectedOutput = new HashMap<>();
|
||||||
|
expectedOutput.put("HOT|DISK:3(HOT)", 2l);
|
||||||
|
expectedOutput.put("COLD|DISK:1,ARCHIVE:2(WARM)", 2l);
|
||||||
|
expectedOutput.put("HOT|ARCHIVE:3(COLD)", 2l);
|
||||||
|
expectedOutput.put("WARM|DISK:3(HOT)", 1l);
|
||||||
|
expectedOutput.put("COLD|DISK:3(HOT)", 1l);
|
||||||
|
expectedOutput.put("WARM|ARCHIVE:3(COLD)", 1l);
|
||||||
|
expectedOutput.put("WARM|DISK:1,ARCHIVE:2(WARM)", 1l);
|
||||||
|
expectedOutput.put("COLD|ARCHIVE:3(COLD)", 1l);
|
||||||
|
expectedOutput.put("HOT|DISK:1,ARCHIVE:2(WARM)", 1l);
|
||||||
|
Assert.assertEquals(expectedOutput,actualOutput);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSortInDescendingOrder() {
|
||||||
|
BlockStoragePolicySuite bsps = BlockStoragePolicySuite.createDefaultSuite();
|
||||||
|
StoragePolicySummary sts = new StoragePolicySummary(bsps.getAllPolicies());
|
||||||
|
BlockStoragePolicy hot = bsps.getPolicy("HOT");
|
||||||
|
BlockStoragePolicy warm = bsps.getPolicy("WARM");
|
||||||
|
BlockStoragePolicy cold = bsps.getPolicy("COLD");
|
||||||
|
//DISK:3
|
||||||
|
sts.add(new StorageType[]{StorageType.DISK,StorageType.DISK,StorageType.DISK},hot);
|
||||||
|
sts.add(new StorageType[]{StorageType.DISK,StorageType.DISK,StorageType.DISK},hot);
|
||||||
|
//DISK:1,ARCHIVE:2
|
||||||
|
sts.add(new StorageType[]{StorageType.DISK,
|
||||||
|
StorageType.ARCHIVE,StorageType.ARCHIVE},warm);
|
||||||
|
sts.add(new StorageType[]{StorageType.ARCHIVE,
|
||||||
|
StorageType.DISK,StorageType.ARCHIVE},warm);
|
||||||
|
sts.add(new StorageType[]{StorageType.ARCHIVE,
|
||||||
|
StorageType.ARCHIVE,StorageType.DISK},warm);
|
||||||
|
//ARCHIVE:3
|
||||||
|
sts.add(new StorageType[]{StorageType.ARCHIVE,
|
||||||
|
StorageType.ARCHIVE,StorageType.ARCHIVE},cold);
|
||||||
|
sts.add(new StorageType[]{StorageType.ARCHIVE,
|
||||||
|
StorageType.ARCHIVE,StorageType.ARCHIVE},cold);
|
||||||
|
sts.add(new StorageType[]{StorageType.ARCHIVE,
|
||||||
|
StorageType.ARCHIVE,StorageType.ARCHIVE},cold);
|
||||||
|
sts.add(new StorageType[]{StorageType.ARCHIVE,
|
||||||
|
StorageType.ARCHIVE,StorageType.ARCHIVE},cold);
|
||||||
|
Map<String, Long> actualOutput = convertToStringMap(sts);
|
||||||
|
Assert.assertEquals(3,actualOutput.size());
|
||||||
|
Map<String, Long> expectedOutput = new LinkedHashMap<>();
|
||||||
|
expectedOutput.put("COLD|ARCHIVE:3(COLD)", 4l);
|
||||||
|
expectedOutput.put("WARM|DISK:1,ARCHIVE:2(WARM)", 3l);
|
||||||
|
expectedOutput.put("HOT|DISK:3(HOT)", 2l);
|
||||||
|
Assert.assertEquals(expectedOutput.toString(),actualOutput.toString());
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue