HDFS-5451. Add byte and file statistics to PathBasedCacheEntry. Contributed by Colin Patrick McCabe.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1543958 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
8313697752
commit
916ab9286b
|
@ -204,6 +204,9 @@ Trunk (Unreleased)
|
||||||
|
|
||||||
HDFS-5525. Inline dust templates for new Web UI. (Haohui Mai via jing9)
|
HDFS-5525. Inline dust templates for new Web UI. (Haohui Mai via jing9)
|
||||||
|
|
||||||
|
HDFS-5451. Add byte and file statistics to PathBasedCacheEntry.
|
||||||
|
(Colin Patrick McCabe via Andrew Wang)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
HDFS-5349. DNA_CACHE and DNA_UNCACHE should be by blockId only. (cmccabe)
|
HDFS-5349. DNA_CACHE and DNA_UNCACHE should be by blockId only. (cmccabe)
|
||||||
|
|
||||||
|
|
|
@ -37,6 +37,9 @@ public class PathBasedCacheDirective {
|
||||||
private Path path;
|
private Path path;
|
||||||
private Short replication;
|
private Short replication;
|
||||||
private String pool;
|
private String pool;
|
||||||
|
private Long bytesNeeded;
|
||||||
|
private Long bytesCached;
|
||||||
|
private Long filesAffected;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Builds a new PathBasedCacheDirective populated with the set properties.
|
* Builds a new PathBasedCacheDirective populated with the set properties.
|
||||||
|
@ -44,7 +47,8 @@ public class PathBasedCacheDirective {
|
||||||
* @return New PathBasedCacheDirective.
|
* @return New PathBasedCacheDirective.
|
||||||
*/
|
*/
|
||||||
public PathBasedCacheDirective build() {
|
public PathBasedCacheDirective build() {
|
||||||
return new PathBasedCacheDirective(id, path, replication, pool);
|
return new PathBasedCacheDirective(id, path, replication, pool,
|
||||||
|
bytesNeeded, bytesCached, filesAffected);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -62,6 +66,9 @@ public class PathBasedCacheDirective {
|
||||||
this.path = directive.getPath();
|
this.path = directive.getPath();
|
||||||
this.replication = directive.getReplication();
|
this.replication = directive.getReplication();
|
||||||
this.pool = directive.getPool();
|
this.pool = directive.getPool();
|
||||||
|
this.bytesNeeded = directive.bytesNeeded;
|
||||||
|
this.bytesCached = directive.bytesCached;
|
||||||
|
this.filesAffected = directive.filesAffected;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -97,6 +104,39 @@ public class PathBasedCacheDirective {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the bytes needed by this directive.
|
||||||
|
*
|
||||||
|
* @param bytesNeeded The bytes needed.
|
||||||
|
* @return This builder, for call chaining.
|
||||||
|
*/
|
||||||
|
public Builder setBytesNeeded(Long bytesNeeded) {
|
||||||
|
this.bytesNeeded = bytesNeeded;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the bytes cached by this directive.
|
||||||
|
*
|
||||||
|
* @param bytesCached The bytes cached.
|
||||||
|
* @return This builder, for call chaining.
|
||||||
|
*/
|
||||||
|
public Builder setBytesCached(Long bytesCached) {
|
||||||
|
this.bytesCached = bytesCached;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the files affected by this directive.
|
||||||
|
*
|
||||||
|
* @param filesAffected The files affected.
|
||||||
|
* @return This builder, for call chaining.
|
||||||
|
*/
|
||||||
|
public Builder setFilesAffected(Long filesAffected) {
|
||||||
|
this.filesAffected = filesAffected;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sets the pool used in this request.
|
* Sets the pool used in this request.
|
||||||
*
|
*
|
||||||
|
@ -113,12 +153,19 @@ public class PathBasedCacheDirective {
|
||||||
private final Path path;
|
private final Path path;
|
||||||
private final Short replication;
|
private final Short replication;
|
||||||
private final String pool;
|
private final String pool;
|
||||||
|
private final Long bytesNeeded;
|
||||||
|
private final Long bytesCached;
|
||||||
|
private final Long filesAffected;
|
||||||
|
|
||||||
PathBasedCacheDirective(Long id, Path path, Short replication, String pool) {
|
PathBasedCacheDirective(Long id, Path path, Short replication, String pool,
|
||||||
|
Long bytesNeeded, Long bytesCached, Long filesAffected) {
|
||||||
this.id = id;
|
this.id = id;
|
||||||
this.path = path;
|
this.path = path;
|
||||||
this.replication = replication;
|
this.replication = replication;
|
||||||
this.pool = pool;
|
this.pool = pool;
|
||||||
|
this.bytesNeeded = bytesNeeded;
|
||||||
|
this.bytesCached = bytesCached;
|
||||||
|
this.filesAffected = filesAffected;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -148,6 +195,27 @@ public class PathBasedCacheDirective {
|
||||||
public String getPool() {
|
public String getPool() {
|
||||||
return pool;
|
return pool;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return The bytes needed.
|
||||||
|
*/
|
||||||
|
public Long getBytesNeeded() {
|
||||||
|
return bytesNeeded;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return The bytes cached.
|
||||||
|
*/
|
||||||
|
public Long getBytesCached() {
|
||||||
|
return bytesCached;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return The files affected.
|
||||||
|
*/
|
||||||
|
public Long getFilesAffected() {
|
||||||
|
return filesAffected;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean equals(Object o) {
|
public boolean equals(Object o) {
|
||||||
|
@ -195,6 +263,18 @@ public class PathBasedCacheDirective {
|
||||||
builder.append(prefix).append("pool: ").append(pool);
|
builder.append(prefix).append("pool: ").append(pool);
|
||||||
prefix = ",";
|
prefix = ",";
|
||||||
}
|
}
|
||||||
|
if (bytesNeeded != null) {
|
||||||
|
builder.append(prefix).append("bytesNeeded: ").append(bytesNeeded);
|
||||||
|
prefix = ",";
|
||||||
|
}
|
||||||
|
if (bytesCached != null) {
|
||||||
|
builder.append(prefix).append("bytesCached: ").append(bytesCached);
|
||||||
|
prefix = ",";
|
||||||
|
}
|
||||||
|
if (filesAffected != null) {
|
||||||
|
builder.append(prefix).append("filesAffected: ").append(filesAffected);
|
||||||
|
prefix = ",";
|
||||||
|
}
|
||||||
builder.append("}");
|
builder.append("}");
|
||||||
return builder.toString();
|
return builder.toString();
|
||||||
}
|
}
|
||||||
|
|
|
@ -35,6 +35,9 @@ public final class PathBasedCacheEntry {
|
||||||
private final String path;
|
private final String path;
|
||||||
private final short replication;
|
private final short replication;
|
||||||
private final CachePool pool;
|
private final CachePool pool;
|
||||||
|
private long bytesNeeded;
|
||||||
|
private long bytesCached;
|
||||||
|
private long filesAffected;
|
||||||
|
|
||||||
public PathBasedCacheEntry(long entryId, String path,
|
public PathBasedCacheEntry(long entryId, String path,
|
||||||
short replication, CachePool pool) {
|
short replication, CachePool pool) {
|
||||||
|
@ -46,6 +49,9 @@ public final class PathBasedCacheEntry {
|
||||||
this.replication = replication;
|
this.replication = replication;
|
||||||
Preconditions.checkNotNull(path);
|
Preconditions.checkNotNull(path);
|
||||||
this.pool = pool;
|
this.pool = pool;
|
||||||
|
this.bytesNeeded = 0;
|
||||||
|
this.bytesCached = 0;
|
||||||
|
this.filesAffected = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
public long getEntryId() {
|
public long getEntryId() {
|
||||||
|
@ -70,6 +76,9 @@ public final class PathBasedCacheEntry {
|
||||||
setPath(new Path(path)).
|
setPath(new Path(path)).
|
||||||
setReplication(replication).
|
setReplication(replication).
|
||||||
setPool(pool.getPoolName()).
|
setPool(pool.getPoolName()).
|
||||||
|
setBytesNeeded(bytesNeeded).
|
||||||
|
setBytesCached(bytesCached).
|
||||||
|
setFilesAffected(filesAffected).
|
||||||
build();
|
build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -80,6 +89,9 @@ public final class PathBasedCacheEntry {
|
||||||
append(", path:").append(path).
|
append(", path:").append(path).
|
||||||
append(", replication:").append(replication).
|
append(", replication:").append(replication).
|
||||||
append(", pool:").append(pool).
|
append(", pool:").append(pool).
|
||||||
|
append(", bytesNeeded:").append(bytesNeeded).
|
||||||
|
append(", bytesCached:").append(bytesCached).
|
||||||
|
append(", filesAffected:").append(filesAffected).
|
||||||
append(" }");
|
append(" }");
|
||||||
return builder.toString();
|
return builder.toString();
|
||||||
}
|
}
|
||||||
|
@ -99,4 +111,40 @@ public final class PathBasedCacheEntry {
|
||||||
public int hashCode() {
|
public int hashCode() {
|
||||||
return new HashCodeBuilder().append(entryId).toHashCode();
|
return new HashCodeBuilder().append(entryId).toHashCode();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public long getBytesNeeded() {
|
||||||
|
return bytesNeeded;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void clearBytesNeeded() {
|
||||||
|
this.bytesNeeded = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addBytesNeeded(long toAdd) {
|
||||||
|
this.bytesNeeded += toAdd;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getBytesCached() {
|
||||||
|
return bytesCached;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void clearBytesCached() {
|
||||||
|
this.bytesCached = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addBytesCached(long toAdd) {
|
||||||
|
this.bytesCached += toAdd;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getFilesAffected() {
|
||||||
|
return filesAffected;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void clearFilesAffected() {
|
||||||
|
this.filesAffected = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void incrementFilesAffected() {
|
||||||
|
this.filesAffected++;
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
|
@ -1583,6 +1583,15 @@ public class PBHelper {
|
||||||
if (directive.getPool() != null) {
|
if (directive.getPool() != null) {
|
||||||
builder.setPool(directive.getPool());
|
builder.setPool(directive.getPool());
|
||||||
}
|
}
|
||||||
|
if (directive.getBytesNeeded() != null) {
|
||||||
|
builder.setBytesNeeded(directive.getBytesNeeded());
|
||||||
|
}
|
||||||
|
if (directive.getBytesCached() != null) {
|
||||||
|
builder.setBytesCached(directive.getBytesCached());
|
||||||
|
}
|
||||||
|
if (directive.getFilesAffected() != null) {
|
||||||
|
builder.setFilesAffected(directive.getFilesAffected());
|
||||||
|
}
|
||||||
return builder.build();
|
return builder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1603,6 +1612,15 @@ public class PBHelper {
|
||||||
if (proto.hasPool()) {
|
if (proto.hasPool()) {
|
||||||
builder.setPool(proto.getPool());
|
builder.setPool(proto.getPool());
|
||||||
}
|
}
|
||||||
|
if (proto.hasBytesNeeded()) {
|
||||||
|
builder.setBytesNeeded(proto.getBytesNeeded());
|
||||||
|
}
|
||||||
|
if (proto.hasBytesCached()) {
|
||||||
|
builder.setBytesCached(proto.getBytesCached());
|
||||||
|
}
|
||||||
|
if (proto.hasFilesAffected()) {
|
||||||
|
builder.setFilesAffected(proto.getFilesAffected());
|
||||||
|
}
|
||||||
return builder.build();
|
return builder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -198,11 +198,6 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
|
||||||
namesystem.writeLock();
|
namesystem.writeLock();
|
||||||
try {
|
try {
|
||||||
rescanPathBasedCacheEntries();
|
rescanPathBasedCacheEntries();
|
||||||
} finally {
|
|
||||||
namesystem.writeUnlock();
|
|
||||||
}
|
|
||||||
namesystem.writeLock();
|
|
||||||
try {
|
|
||||||
rescanCachedBlockMap();
|
rescanCachedBlockMap();
|
||||||
blockManager.getDatanodeManager().resetLastCachingDirectiveSentTime();
|
blockManager.getDatanodeManager().resetLastCachingDirectiveSentTime();
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -220,6 +215,9 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
|
||||||
FSDirectory fsDir = namesystem.getFSDirectory();
|
FSDirectory fsDir = namesystem.getFSDirectory();
|
||||||
for (PathBasedCacheEntry pce : cacheManager.getEntriesById().values()) {
|
for (PathBasedCacheEntry pce : cacheManager.getEntriesById().values()) {
|
||||||
scannedDirectives++;
|
scannedDirectives++;
|
||||||
|
pce.clearBytesNeeded();
|
||||||
|
pce.clearBytesCached();
|
||||||
|
pce.clearFilesAffected();
|
||||||
String path = pce.getPath();
|
String path = pce.getPath();
|
||||||
INode node;
|
INode node;
|
||||||
try {
|
try {
|
||||||
|
@ -258,12 +256,18 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
|
||||||
* @param file The file.
|
* @param file The file.
|
||||||
*/
|
*/
|
||||||
private void rescanFile(PathBasedCacheEntry pce, INodeFile file) {
|
private void rescanFile(PathBasedCacheEntry pce, INodeFile file) {
|
||||||
|
pce.incrementFilesAffected();
|
||||||
BlockInfo[] blockInfos = file.getBlocks();
|
BlockInfo[] blockInfos = file.getBlocks();
|
||||||
|
long cachedTotal = 0;
|
||||||
|
long neededTotal = 0;
|
||||||
for (BlockInfo blockInfo : blockInfos) {
|
for (BlockInfo blockInfo : blockInfos) {
|
||||||
if (!blockInfo.getBlockUCState().equals(BlockUCState.COMPLETE)) {
|
if (!blockInfo.getBlockUCState().equals(BlockUCState.COMPLETE)) {
|
||||||
// We don't try to cache blocks that are under construction.
|
// We don't try to cache blocks that are under construction.
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
long neededByBlock =
|
||||||
|
pce.getReplication() * blockInfo.getNumBytes();
|
||||||
|
neededTotal += neededByBlock;
|
||||||
Block block = new Block(blockInfo.getBlockId());
|
Block block = new Block(blockInfo.getBlockId());
|
||||||
CachedBlock ncblock = new CachedBlock(block.getBlockId(),
|
CachedBlock ncblock = new CachedBlock(block.getBlockId(),
|
||||||
pce.getReplication(), mark);
|
pce.getReplication(), mark);
|
||||||
|
@ -271,6 +275,18 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
|
||||||
if (ocblock == null) {
|
if (ocblock == null) {
|
||||||
cachedBlocks.put(ncblock);
|
cachedBlocks.put(ncblock);
|
||||||
} else {
|
} else {
|
||||||
|
// Update bytesUsed using the current replication levels.
|
||||||
|
// Assumptions: we assume that all the blocks are the same length
|
||||||
|
// on each datanode. We can assume this because we're only caching
|
||||||
|
// blocks in state COMMITTED.
|
||||||
|
// Note that if two directives are caching the same block(s), they will
|
||||||
|
// both get them added to their bytesCached.
|
||||||
|
List<DatanodeDescriptor> cachedOn =
|
||||||
|
ocblock.getDatanodes(Type.CACHED);
|
||||||
|
long cachedByBlock = Math.min(cachedOn.size(), pce.getReplication()) *
|
||||||
|
blockInfo.getNumBytes();
|
||||||
|
cachedTotal += cachedByBlock;
|
||||||
|
|
||||||
if (mark != ocblock.getMark()) {
|
if (mark != ocblock.getMark()) {
|
||||||
// Mark hasn't been set in this scan, so update replication and mark.
|
// Mark hasn't been set in this scan, so update replication and mark.
|
||||||
ocblock.setReplicationAndMark(pce.getReplication(), mark);
|
ocblock.setReplicationAndMark(pce.getReplication(), mark);
|
||||||
|
@ -282,6 +298,12 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
pce.addBytesNeeded(neededTotal);
|
||||||
|
pce.addBytesCached(cachedTotal);
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
|
LOG.debug("Directive " + pce.getEntryId() + " is caching " +
|
||||||
|
file.getFullPathName() + ": " + cachedTotal + "/" + neededTotal);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -525,6 +525,21 @@ class BPOfferService {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private String blockIdArrayToString(long ids[]) {
|
||||||
|
long maxNumberOfBlocksToLog = dn.getMaxNumberOfBlocksToLog();
|
||||||
|
StringBuilder bld = new StringBuilder();
|
||||||
|
String prefix = "";
|
||||||
|
for (int i = 0; i < ids.length; i++) {
|
||||||
|
if (i >= maxNumberOfBlocksToLog) {
|
||||||
|
bld.append("...");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
bld.append(prefix).append(ids[i]);
|
||||||
|
prefix = ", ";
|
||||||
|
}
|
||||||
|
return bld.toString();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This method should handle all commands from Active namenode except
|
* This method should handle all commands from Active namenode except
|
||||||
* DNA_REGISTER which should be handled earlier itself.
|
* DNA_REGISTER which should be handled earlier itself.
|
||||||
|
@ -565,12 +580,16 @@ class BPOfferService {
|
||||||
dn.metrics.incrBlocksRemoved(toDelete.length);
|
dn.metrics.incrBlocksRemoved(toDelete.length);
|
||||||
break;
|
break;
|
||||||
case DatanodeProtocol.DNA_CACHE:
|
case DatanodeProtocol.DNA_CACHE:
|
||||||
LOG.info("DatanodeCommand action: DNA_CACHE");
|
LOG.info("DatanodeCommand action: DNA_CACHE for " +
|
||||||
|
blockIdCmd.getBlockPoolId() + " of [" +
|
||||||
|
blockIdArrayToString(blockIdCmd.getBlockIds()) + "]");
|
||||||
dn.getFSDataset().cache(blockIdCmd.getBlockPoolId(), blockIdCmd.getBlockIds());
|
dn.getFSDataset().cache(blockIdCmd.getBlockPoolId(), blockIdCmd.getBlockIds());
|
||||||
dn.metrics.incrBlocksCached(blockIdCmd.getBlockIds().length);
|
dn.metrics.incrBlocksCached(blockIdCmd.getBlockIds().length);
|
||||||
break;
|
break;
|
||||||
case DatanodeProtocol.DNA_UNCACHE:
|
case DatanodeProtocol.DNA_UNCACHE:
|
||||||
LOG.info("DatanodeCommand action: DNA_UNCACHE");
|
LOG.info("DatanodeCommand action: DNA_UNCACHE for " +
|
||||||
|
blockIdCmd.getBlockPoolId() + " of [" +
|
||||||
|
blockIdArrayToString(blockIdCmd.getBlockIds()) + "]");
|
||||||
dn.getFSDataset().uncache(blockIdCmd.getBlockPoolId(), blockIdCmd.getBlockIds());
|
dn.getFSDataset().uncache(blockIdCmd.getBlockPoolId(), blockIdCmd.getBlockIds());
|
||||||
dn.metrics.incrBlocksUncached(blockIdCmd.getBlockIds().length);
|
dn.metrics.incrBlocksUncached(blockIdCmd.getBlockIds().length);
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -459,7 +459,7 @@ class BPServiceActor implements Runnable {
|
||||||
long sendCost = sendTime - createTime;
|
long sendCost = sendTime - createTime;
|
||||||
dn.getMetrics().addCacheReport(sendCost);
|
dn.getMetrics().addCacheReport(sendCost);
|
||||||
LOG.info("CacheReport of " + blockIds.size()
|
LOG.info("CacheReport of " + blockIds.size()
|
||||||
+ " blocks took " + createCost + " msec to generate and "
|
+ " block(s) took " + createCost + " msec to generate and "
|
||||||
+ sendCost + " msecs for RPC and NN processing");
|
+ sendCost + " msecs for RPC and NN processing");
|
||||||
}
|
}
|
||||||
return cmd;
|
return cmd;
|
||||||
|
|
|
@ -207,6 +207,7 @@ public class DataNode extends Configured
|
||||||
private SecureResources secureResources = null;
|
private SecureResources secureResources = null;
|
||||||
private AbstractList<File> dataDirs;
|
private AbstractList<File> dataDirs;
|
||||||
private Configuration conf;
|
private Configuration conf;
|
||||||
|
private final long maxNumberOfBlocksToLog;
|
||||||
|
|
||||||
private final List<String> usersWithLocalPathAccess;
|
private final List<String> usersWithLocalPathAccess;
|
||||||
private boolean connectToDnViaHostname;
|
private boolean connectToDnViaHostname;
|
||||||
|
@ -231,6 +232,8 @@ public class DataNode extends Configured
|
||||||
final AbstractList<File> dataDirs,
|
final AbstractList<File> dataDirs,
|
||||||
final SecureResources resources) throws IOException {
|
final SecureResources resources) throws IOException {
|
||||||
super(conf);
|
super(conf);
|
||||||
|
this.maxNumberOfBlocksToLog = conf.getLong(DFS_MAX_NUM_BLOCKS_TO_LOG_KEY,
|
||||||
|
DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT);
|
||||||
|
|
||||||
this.usersWithLocalPathAccess = Arrays.asList(
|
this.usersWithLocalPathAccess = Arrays.asList(
|
||||||
conf.getTrimmedStrings(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY));
|
conf.getTrimmedStrings(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY));
|
||||||
|
@ -1031,6 +1034,10 @@ public class DataNode extends Configured
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public long getMaxNumberOfBlocksToLog() {
|
||||||
|
return maxNumberOfBlocksToLog;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block,
|
public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block,
|
||||||
Token<BlockTokenIdentifier> token) throws IOException {
|
Token<BlockTokenIdentifier> token) throws IOException {
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
package org.apache.hadoop.hdfs.tools;
|
package org.apache.hadoop.hdfs.tools;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
@ -394,7 +395,7 @@ public class CacheAdmin extends Configured implements Tool {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String getShortUsage() {
|
public String getShortUsage() {
|
||||||
return "[" + getName() + " [-path <path>] [-pool <pool>]]\n";
|
return "[" + getName() + " [-stats] [-path <path>] [-pool <pool>]]\n";
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -406,6 +407,7 @@ public class CacheAdmin extends Configured implements Tool {
|
||||||
"in a cache pool that we don't have read access for, it " +
|
"in a cache pool that we don't have read access for, it " +
|
||||||
"will not be listed.");
|
"will not be listed.");
|
||||||
listing.addRow("<pool>", "List only path cache directives in that pool.");
|
listing.addRow("<pool>", "List only path cache directives in that pool.");
|
||||||
|
listing.addRow("-stats", "List path-based cache directive statistics.");
|
||||||
return getShortUsage() + "\n" +
|
return getShortUsage() + "\n" +
|
||||||
"List PathBasedCache directives.\n\n" +
|
"List PathBasedCache directives.\n\n" +
|
||||||
listing.toString();
|
listing.toString();
|
||||||
|
@ -423,28 +425,40 @@ public class CacheAdmin extends Configured implements Tool {
|
||||||
if (poolFilter != null) {
|
if (poolFilter != null) {
|
||||||
builder.setPool(poolFilter);
|
builder.setPool(poolFilter);
|
||||||
}
|
}
|
||||||
|
boolean printStats = StringUtils.popOption("-stats", args);
|
||||||
if (!args.isEmpty()) {
|
if (!args.isEmpty()) {
|
||||||
System.err.println("Can't understand argument: " + args.get(0));
|
System.err.println("Can't understand argument: " + args.get(0));
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
TableListing tableListing = new TableListing.Builder().
|
TableListing.Builder tableBuilder = new TableListing.Builder().
|
||||||
addField("ID", Justification.LEFT).
|
addField("ID", Justification.RIGHT).
|
||||||
addField("POOL", Justification.LEFT).
|
addField("POOL", Justification.LEFT).
|
||||||
addField("REPLICATION", Justification.LEFT).
|
addField("REPLICATION", Justification.RIGHT).
|
||||||
addField("PATH", Justification.LEFT).
|
addField("PATH", Justification.LEFT);
|
||||||
build();
|
if (printStats) {
|
||||||
|
tableBuilder.addField("NEEDED", Justification.RIGHT).
|
||||||
|
addField("CACHED", Justification.RIGHT).
|
||||||
|
addField("FILES", Justification.RIGHT);
|
||||||
|
}
|
||||||
|
TableListing tableListing = tableBuilder.build();
|
||||||
|
|
||||||
DistributedFileSystem dfs = getDFS(conf);
|
DistributedFileSystem dfs = getDFS(conf);
|
||||||
RemoteIterator<PathBasedCacheDirective> iter =
|
RemoteIterator<PathBasedCacheDirective> iter =
|
||||||
dfs.listPathBasedCacheDirectives(builder.build());
|
dfs.listPathBasedCacheDirectives(builder.build());
|
||||||
int numEntries = 0;
|
int numEntries = 0;
|
||||||
while (iter.hasNext()) {
|
while (iter.hasNext()) {
|
||||||
PathBasedCacheDirective directive = iter.next();
|
PathBasedCacheDirective directive = iter.next();
|
||||||
String row[] = new String[] {
|
List<String> row = new LinkedList<String>();
|
||||||
"" + directive.getId(), directive.getPool(),
|
row.add("" + directive.getId());
|
||||||
"" + directive.getReplication(),
|
row.add(directive.getPool());
|
||||||
directive.getPath().toUri().getPath(),
|
row.add("" + directive.getReplication());
|
||||||
};
|
row.add(directive.getPath().toUri().getPath());
|
||||||
tableListing.addRow(row);
|
if (printStats) {
|
||||||
|
row.add("" + directive.getBytesNeeded());
|
||||||
|
row.add("" + directive.getBytesCached());
|
||||||
|
row.add("" + directive.getFilesAffected());
|
||||||
|
}
|
||||||
|
tableListing.addRow(row.toArray(new String[0]));
|
||||||
numEntries++;
|
numEntries++;
|
||||||
}
|
}
|
||||||
System.out.print(String.format("Found %d entr%s\n",
|
System.out.print(String.format("Found %d entr%s\n",
|
||||||
|
@ -734,7 +748,7 @@ public class CacheAdmin extends Configured implements Tool {
|
||||||
addField("OWNER", Justification.LEFT).
|
addField("OWNER", Justification.LEFT).
|
||||||
addField("GROUP", Justification.LEFT).
|
addField("GROUP", Justification.LEFT).
|
||||||
addField("MODE", Justification.LEFT).
|
addField("MODE", Justification.LEFT).
|
||||||
addField("WEIGHT", Justification.LEFT).
|
addField("WEIGHT", Justification.RIGHT).
|
||||||
build();
|
build();
|
||||||
int numResults = 0;
|
int numResults = 0;
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -30,9 +30,9 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
* Example:
|
* Example:
|
||||||
*
|
*
|
||||||
* NAME OWNER GROUP MODE WEIGHT
|
* NAME OWNER GROUP MODE WEIGHT
|
||||||
* pool1 andrew andrew rwxr-xr-x 100
|
* pool1 andrew andrew rwxr-xr-x 100
|
||||||
* pool2 andrew andrew rwxr-xr-x 100
|
* pool2 andrew andrew rwxr-xr-x 100
|
||||||
* pool3 andrew andrew rwxr-xr-x 100
|
* pool3 andrew andrew rwxr-xr-x 100
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
|
|
|
@ -368,6 +368,9 @@ message PathBasedCacheDirectiveInfoProto {
|
||||||
optional string path = 2;
|
optional string path = 2;
|
||||||
optional uint32 replication = 3;
|
optional uint32 replication = 3;
|
||||||
optional string pool = 4;
|
optional string pool = 4;
|
||||||
|
optional int64 bytesNeeded = 5;
|
||||||
|
optional int64 bytesCached = 6;
|
||||||
|
optional int64 filesAffected = 7;
|
||||||
}
|
}
|
||||||
|
|
||||||
message AddPathBasedCacheDirectiveRequestProto {
|
message AddPathBasedCacheDirectiveRequestProto {
|
||||||
|
|
|
@ -56,6 +56,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
|
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
|
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList.Type;
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList.Type;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
|
import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
||||||
|
@ -66,7 +67,10 @@ import org.apache.hadoop.security.AccessControlException;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.hadoop.util.GSet;
|
import org.apache.hadoop.util.GSet;
|
||||||
|
import org.apache.log4j.Level;
|
||||||
|
import org.apache.log4j.LogManager;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -100,6 +104,7 @@ public class TestPathBasedCacheRequests {
|
||||||
proto = cluster.getNameNodeRpc();
|
proto = cluster.getNameNodeRpc();
|
||||||
prevCacheManipulator = NativeIO.POSIX.getCacheManipulator();
|
prevCacheManipulator = NativeIO.POSIX.getCacheManipulator();
|
||||||
NativeIO.POSIX.setCacheManipulator(new NoMlockCacheManipulator());
|
NativeIO.POSIX.setCacheManipulator(new NoMlockCacheManipulator());
|
||||||
|
LogManager.getLogger(CacheReplicationMonitor.class).setLevel(Level.TRACE);
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
|
@ -796,10 +801,65 @@ public class TestPathBasedCacheRequests {
|
||||||
build());
|
build());
|
||||||
waitForCachedBlocks(namenode, 4, 8,
|
waitForCachedBlocks(namenode, 4, 8,
|
||||||
"testWaitForCachedReplicasInDirectory:1");
|
"testWaitForCachedReplicasInDirectory:1");
|
||||||
|
// Verify that listDirectives gives the stats we want.
|
||||||
|
RemoteIterator<PathBasedCacheDirective> iter =
|
||||||
|
dfs.listPathBasedCacheDirectives(new PathBasedCacheDirective.Builder().
|
||||||
|
setPath(new Path("/foo")).
|
||||||
|
build());
|
||||||
|
PathBasedCacheDirective directive = iter.next();
|
||||||
|
Assert.assertEquals(Long.valueOf(2),
|
||||||
|
directive.getFilesAffected());
|
||||||
|
Assert.assertEquals(Long.valueOf(
|
||||||
|
2 * numBlocksPerFile * BLOCK_SIZE * 2),
|
||||||
|
directive.getBytesNeeded());
|
||||||
|
Assert.assertEquals(Long.valueOf(
|
||||||
|
2 * numBlocksPerFile * BLOCK_SIZE * 2),
|
||||||
|
directive.getBytesCached());
|
||||||
|
|
||||||
|
long id2 = dfs.addPathBasedCacheDirective(
|
||||||
|
new PathBasedCacheDirective.Builder().
|
||||||
|
setPath(new Path("/foo/bar")).
|
||||||
|
setReplication((short)4).
|
||||||
|
setPool(pool).
|
||||||
|
build());
|
||||||
|
// wait for an additional 2 cached replicas to come up
|
||||||
|
waitForCachedBlocks(namenode, 4, 10,
|
||||||
|
"testWaitForCachedReplicasInDirectory:2");
|
||||||
|
// the directory directive's stats are unchanged
|
||||||
|
iter = dfs.listPathBasedCacheDirectives(
|
||||||
|
new PathBasedCacheDirective.Builder().
|
||||||
|
setPath(new Path("/foo")).
|
||||||
|
build());
|
||||||
|
directive = iter.next();
|
||||||
|
Assert.assertEquals(Long.valueOf(2),
|
||||||
|
directive.getFilesAffected());
|
||||||
|
Assert.assertEquals(Long.valueOf(
|
||||||
|
2 * numBlocksPerFile * BLOCK_SIZE * 2),
|
||||||
|
directive.getBytesNeeded());
|
||||||
|
Assert.assertEquals(Long.valueOf(
|
||||||
|
2 * numBlocksPerFile * BLOCK_SIZE * 2),
|
||||||
|
directive.getBytesCached());
|
||||||
|
// verify /foo/bar's stats
|
||||||
|
iter = dfs.listPathBasedCacheDirectives(
|
||||||
|
new PathBasedCacheDirective.Builder().
|
||||||
|
setPath(new Path("/foo/bar")).
|
||||||
|
build());
|
||||||
|
directive = iter.next();
|
||||||
|
Assert.assertEquals(Long.valueOf(1),
|
||||||
|
directive.getFilesAffected());
|
||||||
|
Assert.assertEquals(Long.valueOf(
|
||||||
|
4 * numBlocksPerFile * BLOCK_SIZE),
|
||||||
|
directive.getBytesNeeded());
|
||||||
|
// only 3 because the file only has 3 replicas, not 4 as requested.
|
||||||
|
Assert.assertEquals(Long.valueOf(
|
||||||
|
3 * numBlocksPerFile * BLOCK_SIZE),
|
||||||
|
directive.getBytesCached());
|
||||||
|
|
||||||
// remove and watch numCached go to 0
|
// remove and watch numCached go to 0
|
||||||
dfs.removePathBasedCacheDirective(id);
|
dfs.removePathBasedCacheDirective(id);
|
||||||
|
dfs.removePathBasedCacheDirective(id2);
|
||||||
waitForCachedBlocks(namenode, 0, 0,
|
waitForCachedBlocks(namenode, 0, 0,
|
||||||
"testWaitForCachedReplicasInDirectory:2");
|
"testWaitForCachedReplicasInDirectory:3");
|
||||||
} finally {
|
} finally {
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
}
|
}
|
||||||
|
|
|
@ -90,7 +90,7 @@
|
||||||
<comparators>
|
<comparators>
|
||||||
<comparator>
|
<comparator>
|
||||||
<type>SubstringComparator</type>
|
<type>SubstringComparator</type>
|
||||||
<expected-output>poolparty bob bobgroup rwxrwxrwx 51</expected-output>
|
<expected-output>poolparty bob bobgroup rwxrwxrwx 51</expected-output>
|
||||||
</comparator>
|
</comparator>
|
||||||
</comparators>
|
</comparators>
|
||||||
</test>
|
</test>
|
||||||
|
@ -129,11 +129,11 @@
|
||||||
</comparator>
|
</comparator>
|
||||||
<comparator>
|
<comparator>
|
||||||
<type>SubstringComparator</type>
|
<type>SubstringComparator</type>
|
||||||
<expected-output>bar alice alicegroup rwxr-xr-x 100 </expected-output>
|
<expected-output>bar alice alicegroup rwxr-xr-x 100</expected-output>
|
||||||
</comparator>
|
</comparator>
|
||||||
<comparator>
|
<comparator>
|
||||||
<type>SubstringComparator</type>
|
<type>SubstringComparator</type>
|
||||||
<expected-output>foo bob bob rw-rw-r-- 100 </expected-output>
|
<expected-output>foo bob bob rw-rw-r-- 100</expected-output>
|
||||||
</comparator>
|
</comparator>
|
||||||
</comparators>
|
</comparators>
|
||||||
</test>
|
</test>
|
||||||
|
@ -156,7 +156,7 @@
|
||||||
</comparator>
|
</comparator>
|
||||||
<comparator>
|
<comparator>
|
||||||
<type>SubstringComparator</type>
|
<type>SubstringComparator</type>
|
||||||
<expected-output>foo bob bob rw-rw-r-- 100 </expected-output>
|
<expected-output>foo bob bob rw-rw-r-- 100</expected-output>
|
||||||
</comparator>
|
</comparator>
|
||||||
</comparators>
|
</comparators>
|
||||||
</test>
|
</test>
|
||||||
|
@ -180,15 +180,15 @@
|
||||||
</comparator>
|
</comparator>
|
||||||
<comparator>
|
<comparator>
|
||||||
<type>SubstringComparator</type>
|
<type>SubstringComparator</type>
|
||||||
<expected-output>1 pool1 1 /foo</expected-output>
|
<expected-output> 1 pool1 1 /foo</expected-output>
|
||||||
</comparator>
|
</comparator>
|
||||||
<comparator>
|
<comparator>
|
||||||
<type>SubstringComparator</type>
|
<type>SubstringComparator</type>
|
||||||
<expected-output>2 pool1 1 /bar</expected-output>
|
<expected-output> 2 pool1 1 /bar</expected-output>
|
||||||
</comparator>
|
</comparator>
|
||||||
<comparator>
|
<comparator>
|
||||||
<type>SubstringComparator</type>
|
<type>SubstringComparator</type>
|
||||||
<expected-output>3 pool1 2 /baz</expected-output>
|
<expected-output> 3 pool1 2 /baz</expected-output>
|
||||||
</comparator>
|
</comparator>
|
||||||
</comparators>
|
</comparators>
|
||||||
</test>
|
</test>
|
||||||
|
@ -234,11 +234,11 @@
|
||||||
</comparator>
|
</comparator>
|
||||||
<comparator>
|
<comparator>
|
||||||
<type>SubstringComparator</type>
|
<type>SubstringComparator</type>
|
||||||
<expected-output>8 pool2 1 /baz</expected-output>
|
<expected-output> 8 pool2 1 /baz</expected-output>
|
||||||
</comparator>
|
</comparator>
|
||||||
<comparator>
|
<comparator>
|
||||||
<type>SubstringComparator</type>
|
<type>SubstringComparator</type>
|
||||||
<expected-output>9 pool2 1 /buz</expected-output>
|
<expected-output> 9 pool2 1 /buz</expected-output>
|
||||||
</comparator>
|
</comparator>
|
||||||
</comparators>
|
</comparators>
|
||||||
</test>
|
</test>
|
||||||
|
@ -265,11 +265,11 @@
|
||||||
</comparator>
|
</comparator>
|
||||||
<comparator>
|
<comparator>
|
||||||
<type>SubstringComparator</type>
|
<type>SubstringComparator</type>
|
||||||
<expected-output>10 pool1 1 /foo</expected-output>
|
<expected-output> 10 pool1 1 /foo</expected-output>
|
||||||
</comparator>
|
</comparator>
|
||||||
<comparator>
|
<comparator>
|
||||||
<type>SubstringComparator</type>
|
<type>SubstringComparator</type>
|
||||||
<expected-output>12 pool2 1 /foo</expected-output>
|
<expected-output> 12 pool2 1 /foo</expected-output>
|
||||||
</comparator>
|
</comparator>
|
||||||
</comparators>
|
</comparators>
|
||||||
</test>
|
</test>
|
||||||
|
@ -296,7 +296,7 @@
|
||||||
</comparator>
|
</comparator>
|
||||||
<comparator>
|
<comparator>
|
||||||
<type>SubstringComparator</type>
|
<type>SubstringComparator</type>
|
||||||
<expected-output>16 pool2 1 /foo</expected-output>
|
<expected-output> 16 pool2 1 /foo</expected-output>
|
||||||
</comparator>
|
</comparator>
|
||||||
</comparators>
|
</comparators>
|
||||||
</test>
|
</test>
|
||||||
|
@ -320,7 +320,7 @@
|
||||||
</comparator>
|
</comparator>
|
||||||
<comparator>
|
<comparator>
|
||||||
<type>SubstringComparator</type>
|
<type>SubstringComparator</type>
|
||||||
<expected-output>19 pool1 1 /bar</expected-output>
|
<expected-output> 19 pool1 1 /bar</expected-output>
|
||||||
</comparator>
|
</comparator>
|
||||||
</comparators>
|
</comparators>
|
||||||
</test>
|
</test>
|
||||||
|
@ -349,11 +349,11 @@
|
||||||
</comparator>
|
</comparator>
|
||||||
<comparator>
|
<comparator>
|
||||||
<type>SubstringComparator</type>
|
<type>SubstringComparator</type>
|
||||||
<expected-output>22 pool1 1 /bar</expected-output>
|
<expected-output> 22 pool1 1 /bar</expected-output>
|
||||||
</comparator>
|
</comparator>
|
||||||
<comparator>
|
<comparator>
|
||||||
<type>SubstringComparator</type>
|
<type>SubstringComparator</type>
|
||||||
<expected-output>24 pool2 1 /bar</expected-output>
|
<expected-output> 24 pool2 1 /bar</expected-output>
|
||||||
</comparator>
|
</comparator>
|
||||||
</comparators>
|
</comparators>
|
||||||
</test>
|
</test>
|
||||||
|
@ -379,7 +379,7 @@
|
||||||
</comparator>
|
</comparator>
|
||||||
<comparator>
|
<comparator>
|
||||||
<type>SubstringComparator</type>
|
<type>SubstringComparator</type>
|
||||||
<expected-output>25 pool1 1 /bar3</expected-output>
|
<expected-output> 25 pool1 1 /bar3</expected-output>
|
||||||
</comparator>
|
</comparator>
|
||||||
</comparators>
|
</comparators>
|
||||||
</test>
|
</test>
|
||||||
|
|
Loading…
Reference in New Issue