HDFS-5659. dfsadmin -report doesn't output cache information properly. Contributed by Andrew Wang.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1554893 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
a44ddd674a
commit
b4eb963c3c
|
@ -928,8 +928,10 @@ public class StringUtils {
|
|||
* @param args List of arguments.
|
||||
* @return null if the option was not found; the value of the
|
||||
* option otherwise.
|
||||
* @throws IllegalArgumentException if the option's argument is not present
|
||||
*/
|
||||
public static String popOptionWithArgument(String name, List<String> args) {
|
||||
public static String popOptionWithArgument(String name, List<String> args)
|
||||
throws IllegalArgumentException {
|
||||
String val = null;
|
||||
for (Iterator<String> iter = args.iterator(); iter.hasNext(); ) {
|
||||
String cur = iter.next();
|
||||
|
@ -939,7 +941,7 @@ public class StringUtils {
|
|||
} else if (cur.equals(name)) {
|
||||
iter.remove();
|
||||
if (!iter.hasNext()) {
|
||||
throw new RuntimeException("option " + name + " requires 1 " +
|
||||
throw new IllegalArgumentException("option " + name + " requires 1 " +
|
||||
"argument.");
|
||||
}
|
||||
val = iter.next();
|
||||
|
|
|
@ -462,6 +462,9 @@ Trunk (Unreleased)
|
|||
processing cache reports that refer to a block not known to the
|
||||
BlockManager. (cmccabe via wang)
|
||||
|
||||
HDFS-5659. dfsadmin -report doesn't output cache information properly.
|
||||
(wang)
|
||||
|
||||
BREAKDOWN OF HDFS-2832 SUBTASKS AND RELATED JIRAS
|
||||
|
||||
HDFS-4985. Add storage type to the protocol and expose it in block report
|
||||
|
|
|
@ -511,21 +511,7 @@ public class PBHelper {
|
|||
|
||||
static public DatanodeInfoProto convertDatanodeInfo(DatanodeInfo di) {
|
||||
if (di == null) return null;
|
||||
DatanodeInfoProto.Builder builder = DatanodeInfoProto.newBuilder();
|
||||
if (di.getNetworkLocation() != null) {
|
||||
builder.setLocation(di.getNetworkLocation());
|
||||
}
|
||||
|
||||
return builder.
|
||||
setId(PBHelper.convert((DatanodeID) di)).
|
||||
setCapacity(di.getCapacity()).
|
||||
setDfsUsed(di.getDfsUsed()).
|
||||
setRemaining(di.getRemaining()).
|
||||
setBlockPoolUsed(di.getBlockPoolUsed()).
|
||||
setLastUpdate(di.getLastUpdate()).
|
||||
setXceiverCount(di.getXceiverCount()).
|
||||
setAdminState(PBHelper.convert(di.getAdminState())).
|
||||
build();
|
||||
return convert(di);
|
||||
}
|
||||
|
||||
|
||||
|
@ -569,15 +555,20 @@ public class PBHelper {
|
|||
|
||||
public static DatanodeInfoProto convert(DatanodeInfo info) {
|
||||
DatanodeInfoProto.Builder builder = DatanodeInfoProto.newBuilder();
|
||||
builder.setBlockPoolUsed(info.getBlockPoolUsed());
|
||||
builder.setAdminState(PBHelper.convert(info.getAdminState()));
|
||||
builder.setCapacity(info.getCapacity())
|
||||
.setDfsUsed(info.getDfsUsed())
|
||||
if (info.getNetworkLocation() != null) {
|
||||
builder.setLocation(info.getNetworkLocation());
|
||||
}
|
||||
builder
|
||||
.setId(PBHelper.convert((DatanodeID)info))
|
||||
.setLastUpdate(info.getLastUpdate())
|
||||
.setLocation(info.getNetworkLocation())
|
||||
.setCapacity(info.getCapacity())
|
||||
.setDfsUsed(info.getDfsUsed())
|
||||
.setRemaining(info.getRemaining())
|
||||
.setBlockPoolUsed(info.getBlockPoolUsed())
|
||||
.setCacheCapacity(info.getCacheCapacity())
|
||||
.setCacheUsed(info.getCacheUsed())
|
||||
.setLastUpdate(info.getLastUpdate())
|
||||
.setXceiverCount(info.getXceiverCount())
|
||||
.setAdminState(PBHelper.convert(info.getAdminState()))
|
||||
.build();
|
||||
return builder.build();
|
||||
}
|
||||
|
|
|
@ -84,7 +84,12 @@ public class CacheAdmin extends Configured implements Tool {
|
|||
for (int j = 1; j < args.length; j++) {
|
||||
argsList.add(args[j]);
|
||||
}
|
||||
return command.run(getConf(), argsList);
|
||||
try {
|
||||
return command.run(getConf(), argsList);
|
||||
} catch (IllegalArgumentException e) {
|
||||
System.err.println(prettifyException(e));
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
public static void main(String[] argsArray) throws IOException {
|
||||
|
@ -135,6 +140,20 @@ public class CacheAdmin extends Configured implements Tool {
|
|||
return maxTtl;
|
||||
}
|
||||
|
||||
private static Expiration parseExpirationString(String ttlString)
|
||||
throws IOException {
|
||||
Expiration ex = null;
|
||||
if (ttlString != null) {
|
||||
if (ttlString.equalsIgnoreCase("never")) {
|
||||
ex = CacheDirectiveInfo.Expiration.NEVER;
|
||||
} else {
|
||||
long ttl = DFSUtil.parseRelativeTime(ttlString);
|
||||
ex = CacheDirectiveInfo.Expiration.newRelative(ttl);
|
||||
}
|
||||
}
|
||||
return ex;
|
||||
}
|
||||
|
||||
interface Command {
|
||||
String getName();
|
||||
String getShortUsage();
|
||||
|
@ -171,6 +190,7 @@ public class CacheAdmin extends Configured implements Tool {
|
|||
listing.addRow("<time-to-live>", "How long the directive is " +
|
||||
"valid. Can be specified in minutes, hours, and days, e.g. " +
|
||||
"30m, 4h, 2d. Valid units are [smhd]." +
|
||||
" \"never\" indicates a directive that never expires." +
|
||||
" If unspecified, the directive never expires.");
|
||||
return getShortUsage() + "\n" +
|
||||
"Add a new cache directive.\n\n" +
|
||||
|
@ -203,15 +223,15 @@ public class CacheAdmin extends Configured implements Tool {
|
|||
}
|
||||
|
||||
String ttlString = StringUtils.popOptionWithArgument("-ttl", args);
|
||||
if (ttlString != null) {
|
||||
try {
|
||||
long ttl = DFSUtil.parseRelativeTime(ttlString);
|
||||
builder.setExpiration(CacheDirectiveInfo.Expiration.newRelative(ttl));
|
||||
} catch (IOException e) {
|
||||
System.err.println(
|
||||
"Error while parsing ttl value: " + e.getMessage());
|
||||
return 1;
|
||||
try {
|
||||
Expiration ex = parseExpirationString(ttlString);
|
||||
if (ex != null) {
|
||||
builder.setExpiration(ex);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
System.err.println(
|
||||
"Error while parsing ttl value: " + e.getMessage());
|
||||
return 1;
|
||||
}
|
||||
|
||||
if (!args.isEmpty()) {
|
||||
|
@ -326,7 +346,7 @@ public class CacheAdmin extends Configured implements Tool {
|
|||
listing.addRow("<time-to-live>", "How long the directive is " +
|
||||
"valid. Can be specified in minutes, hours, and days, e.g. " +
|
||||
"30m, 4h, 2d. Valid units are [smhd]." +
|
||||
" If unspecified, the directive never expires.");
|
||||
" \"never\" indicates a directive that never expires.");
|
||||
return getShortUsage() + "\n" +
|
||||
"Modify a cache directive.\n\n" +
|
||||
listing.toString();
|
||||
|
@ -362,17 +382,16 @@ public class CacheAdmin extends Configured implements Tool {
|
|||
modified = true;
|
||||
}
|
||||
String ttlString = StringUtils.popOptionWithArgument("-ttl", args);
|
||||
if (ttlString != null) {
|
||||
long ttl;
|
||||
try {
|
||||
ttl = DFSUtil.parseRelativeTime(ttlString);
|
||||
} catch (IOException e) {
|
||||
System.err.println(
|
||||
"Error while parsing ttl value: " + e.getMessage());
|
||||
return 1;
|
||||
try {
|
||||
Expiration ex = parseExpirationString(ttlString);
|
||||
if (ex != null) {
|
||||
builder.setExpiration(ex);
|
||||
modified = true;
|
||||
}
|
||||
builder.setExpiration(CacheDirectiveInfo.Expiration.newRelative(ttl));
|
||||
modified = true;
|
||||
} catch (IOException e) {
|
||||
System.err.println(
|
||||
"Error while parsing ttl value: " + e.getMessage());
|
||||
return 1;
|
||||
}
|
||||
if (!args.isEmpty()) {
|
||||
System.err.println("Can't understand argument: " + args.get(0));
|
||||
|
|
|
@ -36,6 +36,7 @@ import java.util.Set;
|
|||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.HdfsBlockLocation;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
@ -82,7 +83,11 @@ public class TestFsDatasetCache {
|
|||
|
||||
// Most Linux installs allow a default of 64KB locked memory
|
||||
private static final long CACHE_CAPACITY = 64 * 1024;
|
||||
private static final long BLOCK_SIZE = 4096;
|
||||
// mlock always locks the entire page. So we don't need to deal with this
|
||||
// rounding, use the OS page size for the block size.
|
||||
private static final long PAGE_SIZE =
|
||||
NativeIO.POSIX.getCacheManipulator().getOperatingSystemPageSize();
|
||||
private static final long BLOCK_SIZE = PAGE_SIZE;
|
||||
|
||||
private static Configuration conf;
|
||||
private static MiniDFSCluster cluster = null;
|
||||
|
@ -451,4 +456,27 @@ public class TestFsDatasetCache {
|
|||
}
|
||||
}, 100, 10000);
|
||||
}
|
||||
|
||||
@Test(timeout=60000)
|
||||
public void testPageRounder() throws Exception {
|
||||
// Write a small file
|
||||
Path fileName = new Path("/testPageRounder");
|
||||
final int smallBlocks = 512; // This should be smaller than the page size
|
||||
assertTrue("Page size should be greater than smallBlocks!",
|
||||
PAGE_SIZE > smallBlocks);
|
||||
final int numBlocks = 5;
|
||||
final int fileLen = smallBlocks * numBlocks;
|
||||
FSDataOutputStream out =
|
||||
fs.create(fileName, false, 4096, (short)1, smallBlocks);
|
||||
out.write(new byte[fileLen]);
|
||||
out.close();
|
||||
HdfsBlockLocation[] locs = (HdfsBlockLocation[])fs.getFileBlockLocations(
|
||||
fileName, 0, fileLen);
|
||||
// Cache the file and check the sizes match the page size
|
||||
setHeartbeatResponse(cacheBlocks(locs));
|
||||
verifyExpectedCacheUsage(PAGE_SIZE * numBlocks, numBlocks);
|
||||
// Uncache and check that it decrements by the page size too
|
||||
setHeartbeatResponse(uncacheBlocks(locs));
|
||||
verifyExpectedCacheUsage(0, 0);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -65,7 +65,9 @@ import org.apache.hadoop.hdfs.protocol.CacheDirectiveIterator;
|
|||
import org.apache.hadoop.hdfs.protocol.CacheDirectiveStats;
|
||||
import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
|
||||
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo.Expiration;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
||||
import org.apache.hadoop.hdfs.protocol.CachePoolStats;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList.Type;
|
||||
|
@ -105,7 +107,7 @@ public class TestCacheDirectives {
|
|||
EditLogFileOutputStream.setShouldSkipFsyncForTesting(false);
|
||||
}
|
||||
|
||||
private static final long BLOCK_SIZE = 512;
|
||||
private static final long BLOCK_SIZE = 4096;
|
||||
private static final int NUM_DATANODES = 4;
|
||||
// Most Linux installs will allow non-root users to lock 64KB.
|
||||
// In this test though, we stub out mlock so this doesn't matter.
|
||||
|
@ -835,6 +837,24 @@ public class TestCacheDirectives {
|
|||
waitForCachedBlocks(namenode, expected, expected,
|
||||
"testWaitForCachedReplicas:1");
|
||||
}
|
||||
|
||||
// Check that the datanodes have the right cache values
|
||||
DatanodeInfo[] live = dfs.getDataNodeStats(DatanodeReportType.LIVE);
|
||||
assertEquals("Unexpected number of live nodes", NUM_DATANODES, live.length);
|
||||
long totalUsed = 0;
|
||||
for (DatanodeInfo dn : live) {
|
||||
final long cacheCapacity = dn.getCacheCapacity();
|
||||
final long cacheUsed = dn.getCacheUsed();
|
||||
final long cacheRemaining = dn.getCacheRemaining();
|
||||
assertEquals("Unexpected cache capacity", CACHE_CAPACITY, cacheCapacity);
|
||||
assertEquals("Capacity not equal to used + remaining",
|
||||
cacheCapacity, cacheUsed + cacheRemaining);
|
||||
assertEquals("Remaining not equal to capacity - used",
|
||||
cacheCapacity - cacheUsed, cacheRemaining);
|
||||
totalUsed += cacheUsed;
|
||||
}
|
||||
assertEquals(expected*BLOCK_SIZE, totalUsed);
|
||||
|
||||
// Uncache and check each path in sequence
|
||||
RemoteIterator<CacheDirectiveEntry> entries =
|
||||
new CacheDirectiveIterator(nnRpc, null);
|
||||
|
@ -974,7 +994,6 @@ public class TestCacheDirectives {
|
|||
(4+3) * numBlocksPerFile * BLOCK_SIZE,
|
||||
3, 2,
|
||||
poolInfo, "testWaitForCachedReplicasInDirectory:2:pool");
|
||||
|
||||
// remove and watch numCached go to 0
|
||||
dfs.removeCacheDirective(id);
|
||||
dfs.removeCacheDirective(id2);
|
||||
|
|
Loading…
Reference in New Issue