HDFS-6613. Improve logging in caching classes. (wang)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1607697 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Andrew Wang 2014-07-03 17:13:59 +00:00
parent 160c912ee6
commit 93e23a9915
5 changed files with 133 additions and 176 deletions

View File

@ -262,6 +262,8 @@ Release 2.6.0 - UNRELEASED
IMPROVEMENTS IMPROVEMENTS
HDFS-6613. Improve logging in caching classes. (wang)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -33,8 +33,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
@ -53,8 +51,11 @@ import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
import org.apache.hadoop.hdfs.util.ReadOnlyList; import org.apache.hadoop.hdfs.util.ReadOnlyList;
import org.apache.hadoop.util.GSet; import org.apache.hadoop.util.GSet;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
;
/** /**
* Scans the namesystem, scheduling blocks to be cached as appropriate. * Scans the namesystem, scheduling blocks to be cached as appropriate.
@ -65,8 +66,8 @@ import com.google.common.base.Preconditions;
@InterfaceAudience.LimitedPrivate({"HDFS"}) @InterfaceAudience.LimitedPrivate({"HDFS"})
public class CacheReplicationMonitor extends Thread implements Closeable { public class CacheReplicationMonitor extends Thread implements Closeable {
private static final Log LOG = private static final Logger LOG =
LogFactory.getLog(CacheReplicationMonitor.class); LoggerFactory.getLogger(CacheReplicationMonitor.class);
private final FSNamesystem namesystem; private final FSNamesystem namesystem;
@ -207,7 +208,7 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
LOG.info("Shutting down CacheReplicationMonitor."); LOG.info("Shutting down CacheReplicationMonitor.");
return; return;
} catch (Throwable t) { } catch (Throwable t) {
LOG.fatal("Thread exiting", t); LOG.error("Thread exiting", t);
terminate(1, t); terminate(1, t);
} }
} }
@ -316,11 +317,8 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
scannedDirectives++; scannedDirectives++;
// Skip processing this entry if it has expired // Skip processing this entry if it has expired
if (directive.getExpiryTime() > 0 && directive.getExpiryTime() <= now) { if (directive.getExpiryTime() > 0 && directive.getExpiryTime() <= now) {
if (LOG.isDebugEnabled()) { LOG.debug("Directive {}: the directive expired at {} (now = {})",
LOG.debug("Directive " + directive.getId() + ": the directive " + directive.getId(), directive.getExpiryTime(), now);
"expired at " + directive.getExpiryTime() + " (now = " +
now + ")");
}
continue; continue;
} }
String path = directive.getPath(); String path = directive.getPath();
@ -329,17 +327,14 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
node = fsDir.getINode(path); node = fsDir.getINode(path);
} catch (UnresolvedLinkException e) { } catch (UnresolvedLinkException e) {
// We don't cache through symlinks // We don't cache through symlinks
if (LOG.isDebugEnabled()) { LOG.debug("Directive {}: got UnresolvedLinkException while resolving "
LOG.debug("Directive " + directive.getId() + + "path {}", directive.getId(), path
": got UnresolvedLinkException while resolving path " + path); );
}
continue; continue;
} }
if (node == null) { if (node == null) {
if (LOG.isDebugEnabled()) { LOG.debug("Directive {}: No inode found at {}", directive.getId(),
LOG.debug("Directive " + directive.getId() + path);
": No inode found at " + path);
}
} else if (node.isDirectory()) { } else if (node.isDirectory()) {
INodeDirectory dir = node.asDirectory(); INodeDirectory dir = node.asDirectory();
ReadOnlyList<INode> children = dir ReadOnlyList<INode> children = dir
@ -352,10 +347,8 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
} else if (node.isFile()) { } else if (node.isFile()) {
rescanFile(directive, node.asFile()); rescanFile(directive, node.asFile());
} else { } else {
if (LOG.isDebugEnabled()) { LOG.debug("Directive {}: ignoring non-directive, non-file inode {} ",
LOG.debug("Directive " + directive.getId() + directive.getId(), node);
": ignoring non-directive, non-file inode " + node);
}
} }
} }
} }
@ -381,15 +374,13 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
// do not cache this file. // do not cache this file.
CachePool pool = directive.getPool(); CachePool pool = directive.getPool();
if (pool.getBytesNeeded() > pool.getLimit()) { if (pool.getBytesNeeded() > pool.getLimit()) {
if (LOG.isDebugEnabled()) { LOG.debug("Directive {}: not scanning file {} because " +
LOG.debug(String.format("Directive %d: not scanning file %s because " + "bytesNeeded for pool {} is {}, but the pool's limit is {}",
"bytesNeeded for pool %s is %d, but the pool's limit is %d",
directive.getId(), directive.getId(),
file.getFullPathName(), file.getFullPathName(),
pool.getPoolName(), pool.getPoolName(),
pool.getBytesNeeded(), pool.getBytesNeeded(),
pool.getLimit())); pool.getLimit());
}
return; return;
} }
@ -397,11 +388,10 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
for (BlockInfo blockInfo : blockInfos) { for (BlockInfo blockInfo : blockInfos) {
if (!blockInfo.getBlockUCState().equals(BlockUCState.COMPLETE)) { if (!blockInfo.getBlockUCState().equals(BlockUCState.COMPLETE)) {
// We don't try to cache blocks that are under construction. // We don't try to cache blocks that are under construction.
if (LOG.isTraceEnabled()) { LOG.trace("Directive {}: can't cache block {} because it is in state "
LOG.trace("Directive " + directive.getId() + ": can't cache " + + "{}, not COMPLETE.", directive.getId(), blockInfo,
"block " + blockInfo + " because it is in state " + blockInfo.getBlockUCState()
blockInfo.getBlockUCState() + ", not COMPLETE."); );
}
continue; continue;
} }
Block block = new Block(blockInfo.getBlockId()); Block block = new Block(blockInfo.getBlockId());
@ -415,7 +405,7 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
// Update bytesUsed using the current replication levels. // Update bytesUsed using the current replication levels.
// Assumptions: we assume that all the blocks are the same length // Assumptions: we assume that all the blocks are the same length
// on each datanode. We can assume this because we're only caching // on each datanode. We can assume this because we're only caching
// blocks in state COMMITTED. // blocks in state COMPLETE.
// Note that if two directives are caching the same block(s), they will // Note that if two directives are caching the same block(s), they will
// both get them added to their bytesCached. // both get them added to their bytesCached.
List<DatanodeDescriptor> cachedOn = List<DatanodeDescriptor> cachedOn =
@ -441,21 +431,16 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
ocblock.setReplicationAndMark(directive.getReplication(), mark); ocblock.setReplicationAndMark(directive.getReplication(), mark);
} }
} }
if (LOG.isTraceEnabled()) { LOG.trace("Directive {}: setting replication for block {} to {}",
LOG.trace("Directive " + directive.getId() + ": setting replication " + directive.getId(), blockInfo, ocblock.getReplication());
"for block " + blockInfo + " to " + ocblock.getReplication());
}
} }
// Increment the "cached" statistics // Increment the "cached" statistics
directive.addBytesCached(cachedTotal); directive.addBytesCached(cachedTotal);
if (cachedTotal == neededTotal) { if (cachedTotal == neededTotal) {
directive.addFilesCached(1); directive.addFilesCached(1);
} }
if (LOG.isDebugEnabled()) { LOG.debug("Directive {}: caching {}: {}/{} bytes", directive.getId(),
LOG.debug("Directive " + directive.getId() + ": caching " + file.getFullPathName(), cachedTotal, neededTotal);
file.getFullPathName() + ": " + cachedTotal + "/" + neededTotal +
" bytes");
}
} }
private String findReasonForNotCaching(CachedBlock cblock, private String findReasonForNotCaching(CachedBlock cblock,
@ -512,11 +497,9 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
iter.hasNext(); ) { iter.hasNext(); ) {
DatanodeDescriptor datanode = iter.next(); DatanodeDescriptor datanode = iter.next();
if (!cblock.isInList(datanode.getCached())) { if (!cblock.isInList(datanode.getCached())) {
if (LOG.isTraceEnabled()) { LOG.trace("Block {}: removing from PENDING_UNCACHED for node {} "
LOG.trace("Block " + cblock.getBlockId() + ": removing from " + + "because the DataNode uncached it.", cblock.getBlockId(),
"PENDING_UNCACHED for node " + datanode.getDatanodeUuid() + datanode.getDatanodeUuid());
"because the DataNode uncached it.");
}
datanode.getPendingUncached().remove(cblock); datanode.getPendingUncached().remove(cblock);
iter.remove(); iter.remove();
} }
@ -526,10 +509,8 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
String reason = findReasonForNotCaching(cblock, blockInfo); String reason = findReasonForNotCaching(cblock, blockInfo);
int neededCached = 0; int neededCached = 0;
if (reason != null) { if (reason != null) {
if (LOG.isTraceEnabled()) { LOG.trace("Block {}: can't cache block because it is {}",
LOG.trace("Block " + cblock.getBlockId() + ": can't cache " + cblock.getBlockId(), reason);
"block because it is " + reason);
}
} else { } else {
neededCached = cblock.getReplication(); neededCached = cblock.getReplication();
} }
@ -541,12 +522,12 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
DatanodeDescriptor datanode = iter.next(); DatanodeDescriptor datanode = iter.next();
datanode.getPendingCached().remove(cblock); datanode.getPendingCached().remove(cblock);
iter.remove(); iter.remove();
if (LOG.isTraceEnabled()) { LOG.trace("Block {}: removing from PENDING_CACHED for node {}"
LOG.trace("Block " + cblock.getBlockId() + ": removing from " + + "because we already have {} cached replicas and we only" +
"PENDING_CACHED for node " + datanode.getDatanodeUuid() + " need {}",
"because we already have " + numCached + " cached " + cblock.getBlockId(), datanode.getDatanodeUuid(), numCached,
"replicas and we only need " + neededCached); neededCached
} );
} }
} }
if (numCached < neededCached) { if (numCached < neededCached) {
@ -556,12 +537,11 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
DatanodeDescriptor datanode = iter.next(); DatanodeDescriptor datanode = iter.next();
datanode.getPendingUncached().remove(cblock); datanode.getPendingUncached().remove(cblock);
iter.remove(); iter.remove();
if (LOG.isTraceEnabled()) { LOG.trace("Block {}: removing from PENDING_UNCACHED for node {} "
LOG.trace("Block " + cblock.getBlockId() + ": removing from " + + "because we only have {} cached replicas and we need " +
"PENDING_UNCACHED for node " + datanode.getDatanodeUuid() + "{}", cblock.getBlockId(), datanode.getDatanodeUuid(),
"because we only have " + numCached + " cached replicas " + numCached, neededCached
"and we need " + neededCached); );
}
} }
} }
int neededUncached = numCached - int neededUncached = numCached -
@ -581,11 +561,10 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
pendingUncached.isEmpty() && pendingUncached.isEmpty() &&
pendingCached.isEmpty()) { pendingCached.isEmpty()) {
// we have nothing more to do with this block. // we have nothing more to do with this block.
if (LOG.isTraceEnabled()) { LOG.trace("Block {}: removing from cachedBlocks, since neededCached "
LOG.trace("Block " + cblock.getBlockId() + ": removing from " + + "== 0, and pendingUncached and pendingCached are empty.",
"cachedBlocks, since neededCached == 0, and " + cblock.getBlockId()
"pendingUncached and pendingCached are empty."); );
}
cbIter.remove(); cbIter.remove();
} }
} }
@ -643,18 +622,14 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
BlockInfo blockInfo = blockManager. BlockInfo blockInfo = blockManager.
getStoredBlock(new Block(cachedBlock.getBlockId())); getStoredBlock(new Block(cachedBlock.getBlockId()));
if (blockInfo == null) { if (blockInfo == null) {
if (LOG.isDebugEnabled()) { LOG.debug("Block {}: can't add new cached replicas," +
LOG.debug("Block " + cachedBlock.getBlockId() + ": can't add new " + " because there is no record of this block " +
"cached replicas, because there is no record of this block " + "on the NameNode.", cachedBlock.getBlockId());
"on the NameNode.");
}
return; return;
} }
if (!blockInfo.isComplete()) { if (!blockInfo.isComplete()) {
if (LOG.isDebugEnabled()) { LOG.debug("Block {}: can't cache this block, because it is not yet"
LOG.debug("Block " + cachedBlock.getBlockId() + ": can't cache this " + + " complete.", cachedBlock.getBlockId());
"block, because it is not yet complete.");
}
return; return;
} }
// Filter the list of replicas to only the valid targets // Filter the list of replicas to only the valid targets
@ -678,7 +653,7 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
if (pendingCached.contains(datanode) || cached.contains(datanode)) { if (pendingCached.contains(datanode) || cached.contains(datanode)) {
continue; continue;
} }
long pendingCapacity = datanode.getCacheRemaining(); long pendingBytes = 0;
// Subtract pending cached blocks from effective capacity // Subtract pending cached blocks from effective capacity
Iterator<CachedBlock> it = datanode.getPendingCached().iterator(); Iterator<CachedBlock> it = datanode.getPendingCached().iterator();
while (it.hasNext()) { while (it.hasNext()) {
@ -686,7 +661,7 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
BlockInfo info = BlockInfo info =
blockManager.getStoredBlock(new Block(cBlock.getBlockId())); blockManager.getStoredBlock(new Block(cBlock.getBlockId()));
if (info != null) { if (info != null) {
pendingCapacity -= info.getNumBytes(); pendingBytes -= info.getNumBytes();
} }
} }
it = datanode.getPendingUncached().iterator(); it = datanode.getPendingUncached().iterator();
@ -696,17 +671,17 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
BlockInfo info = BlockInfo info =
blockManager.getStoredBlock(new Block(cBlock.getBlockId())); blockManager.getStoredBlock(new Block(cBlock.getBlockId()));
if (info != null) { if (info != null) {
pendingCapacity += info.getNumBytes(); pendingBytes += info.getNumBytes();
} }
} }
long pendingCapacity = pendingBytes + datanode.getCacheRemaining();
if (pendingCapacity < blockInfo.getNumBytes()) { if (pendingCapacity < blockInfo.getNumBytes()) {
if (LOG.isTraceEnabled()) { LOG.trace("Block {}: DataNode {} is not a valid possibility " +
LOG.trace("Block " + blockInfo.getBlockId() + ": DataNode " + "because the block has size {}, but the DataNode only has {}" +
datanode.getDatanodeUuid() + " is not a valid possibility " + "bytes of cache remaining ({} pending bytes, {} already cached.",
"because the block has size " + blockInfo.getNumBytes() + ", but " + blockInfo.getBlockId(), datanode.getDatanodeUuid(),
"the DataNode only has " + datanode.getCacheRemaining() + " " + blockInfo.getNumBytes(), pendingCapacity, pendingBytes,
"bytes of cache remaining."); datanode.getCacheRemaining());
}
outOfCapacity++; outOfCapacity++;
continue; continue;
} }
@ -715,22 +690,20 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
List<DatanodeDescriptor> chosen = chooseDatanodesForCaching(possibilities, List<DatanodeDescriptor> chosen = chooseDatanodesForCaching(possibilities,
neededCached, blockManager.getDatanodeManager().getStaleInterval()); neededCached, blockManager.getDatanodeManager().getStaleInterval());
for (DatanodeDescriptor datanode : chosen) { for (DatanodeDescriptor datanode : chosen) {
if (LOG.isTraceEnabled()) { LOG.trace("Block {}: added to PENDING_CACHED on DataNode {}",
LOG.trace("Block " + blockInfo.getBlockId() + ": added to " + blockInfo.getBlockId(), datanode.getDatanodeUuid());
"PENDING_CACHED on DataNode " + datanode.getDatanodeUuid());
}
pendingCached.add(datanode); pendingCached.add(datanode);
boolean added = datanode.getPendingCached().add(cachedBlock); boolean added = datanode.getPendingCached().add(cachedBlock);
assert added; assert added;
} }
// We were unable to satisfy the requested replication factor // We were unable to satisfy the requested replication factor
if (neededCached > chosen.size()) { if (neededCached > chosen.size()) {
if (LOG.isDebugEnabled()) { LOG.debug("Block {}: we only have {} of {} cached replicas."
LOG.debug("Block " + blockInfo.getBlockId() + ": we only have " + + " {} DataNodes have insufficient cache capacity.",
(cachedBlock.getReplication() - neededCached + chosen.size()) + blockInfo.getBlockId(),
" of " + cachedBlock.getReplication() + " cached replicas. " + (cachedBlock.getReplication() - neededCached + chosen.size()),
outOfCapacity + " DataNodes have insufficient cache capacity."); cachedBlock.getReplication(), outOfCapacity
} );
} }
} }

View File

@ -37,8 +37,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.ChecksumException;
@ -47,6 +45,8 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.io.nativeio.NativeIO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* Manages caching for an FsDatasetImpl by using the mmap(2) and mlock(2) * Manages caching for an FsDatasetImpl by using the mmap(2) and mlock(2)
@ -101,7 +101,8 @@ public class FsDatasetCache {
} }
} }
private static final Log LOG = LogFactory.getLog(FsDatasetCache.class); private static final Logger LOG = LoggerFactory.getLogger(FsDatasetCache
.class);
/** /**
* Stores MappableBlock objects and the states they're in. * Stores MappableBlock objects and the states they're in.
@ -245,21 +246,17 @@ public class FsDatasetCache {
ExtendedBlockId key = new ExtendedBlockId(blockId, bpid); ExtendedBlockId key = new ExtendedBlockId(blockId, bpid);
Value prevValue = mappableBlockMap.get(key); Value prevValue = mappableBlockMap.get(key);
if (prevValue != null) { if (prevValue != null) {
if (LOG.isDebugEnabled()) { LOG.debug("Block with id {}, pool {} already exists in the "
LOG.debug("Block with id " + blockId + ", pool " + bpid + + "FsDatasetCache with state {}", blockId, bpid, prevValue.state
" already exists in the FsDatasetCache with state " + );
prevValue.state);
}
numBlocksFailedToCache.incrementAndGet(); numBlocksFailedToCache.incrementAndGet();
return; return;
} }
mappableBlockMap.put(key, new Value(null, State.CACHING)); mappableBlockMap.put(key, new Value(null, State.CACHING));
volumeExecutor.execute( volumeExecutor.execute(
new CachingTask(key, blockFileName, length, genstamp)); new CachingTask(key, blockFileName, length, genstamp));
if (LOG.isDebugEnabled()) { LOG.debug("Initiating caching for Block with id {}, pool {}", blockId,
LOG.debug("Initiating caching for Block with id " + blockId + bpid);
", pool " + bpid);
}
} }
synchronized void uncacheBlock(String bpid, long blockId) { synchronized void uncacheBlock(String bpid, long blockId) {
@ -270,44 +267,34 @@ public class FsDatasetCache {
processBlockMunlockRequest(key)) { processBlockMunlockRequest(key)) {
// TODO: we probably want to forcibly uncache the block (and close the // TODO: we probably want to forcibly uncache the block (and close the
// shm) after a certain timeout has elapsed. // shm) after a certain timeout has elapsed.
if (LOG.isDebugEnabled()) { LOG.debug("{} is anchored, and can't be uncached now.", key);
LOG.debug(key + " is anchored, and can't be uncached now.");
}
return; return;
} }
if (prevValue == null) { if (prevValue == null) {
if (LOG.isDebugEnabled()) { LOG.debug("Block with id {}, pool {} does not need to be uncached, "
LOG.debug("Block with id " + blockId + ", pool " + bpid + " " + + "because it is not currently in the mappableBlockMap.", blockId,
"does not need to be uncached, because it is not currently " + bpid);
"in the mappableBlockMap.");
}
numBlocksFailedToUncache.incrementAndGet(); numBlocksFailedToUncache.incrementAndGet();
return; return;
} }
switch (prevValue.state) { switch (prevValue.state) {
case CACHING: case CACHING:
if (LOG.isDebugEnabled()) { LOG.debug("Cancelling caching for block with id {}, pool {}.", blockId,
LOG.debug("Cancelling caching for block with id " + blockId + bpid);
", pool " + bpid + ".");
}
mappableBlockMap.put(key, mappableBlockMap.put(key,
new Value(prevValue.mappableBlock, State.CACHING_CANCELLED)); new Value(prevValue.mappableBlock, State.CACHING_CANCELLED));
break; break;
case CACHED: case CACHED:
if (LOG.isDebugEnabled()) { LOG.debug(
LOG.debug("Block with id " + blockId + ", pool " + bpid + " " + "Block with id {}, pool {} has been scheduled for uncaching" + ".",
"has been scheduled for uncaching."); blockId, bpid);
}
mappableBlockMap.put(key, mappableBlockMap.put(key,
new Value(prevValue.mappableBlock, State.UNCACHING)); new Value(prevValue.mappableBlock, State.UNCACHING));
uncachingExecutor.execute(new UncachingTask(key)); uncachingExecutor.execute(new UncachingTask(key));
break; break;
default: default:
if (LOG.isDebugEnabled()) { LOG.debug("Block with id {}, pool {} does not need to be uncached, "
LOG.debug("Block with id " + blockId + ", pool " + bpid + " " + + "because it is in state {}.", blockId, bpid, prevValue.state);
"does not need to be uncached, because it is " +
"in state " + prevValue.state + ".");
}
numBlocksFailedToUncache.incrementAndGet(); numBlocksFailedToUncache.incrementAndGet();
break; break;
} }
@ -386,10 +373,8 @@ public class FsDatasetCache {
} }
mappableBlockMap.put(key, new Value(mappableBlock, State.CACHED)); mappableBlockMap.put(key, new Value(mappableBlock, State.CACHED));
} }
if (LOG.isDebugEnabled()) { LOG.debug("Successfully cached {}. We are now caching {} bytes in"
LOG.debug("Successfully cached " + key + ". We are now caching " + + " total.", key, newUsedBytes);
newUsedBytes + " bytes in total.");
}
dataset.datanode.getShortCircuitRegistry().processBlockMlockEvent(key); dataset.datanode.getShortCircuitRegistry().processBlockMlockEvent(key);
numBlocksCached.addAndGet(1); numBlocksCached.addAndGet(1);
dataset.datanode.getMetrics().incrBlocksCached(1); dataset.datanode.getMetrics().incrBlocksCached(1);
@ -399,12 +384,10 @@ public class FsDatasetCache {
IOUtils.closeQuietly(metaIn); IOUtils.closeQuietly(metaIn);
if (!success) { if (!success) {
if (reservedBytes) { if (reservedBytes) {
newUsedBytes = usedBytesCount.release(length); usedBytesCount.release(length);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Caching of " + key + " was aborted. We are now " +
"caching only " + newUsedBytes + " + bytes in total.");
} }
LOG.debug("Caching of {} was aborted. We are now caching only {} "
+ "bytes in total.", key, usedBytesCount.get());
if (mappableBlock != null) { if (mappableBlock != null) {
mappableBlock.close(); mappableBlock.close();
} }
@ -444,10 +427,7 @@ public class FsDatasetCache {
usedBytesCount.release(value.mappableBlock.getLength()); usedBytesCount.release(value.mappableBlock.getLength());
numBlocksCached.addAndGet(-1); numBlocksCached.addAndGet(-1);
dataset.datanode.getMetrics().incrBlocksUncached(1); dataset.datanode.getMetrics().incrBlocksUncached(1);
if (LOG.isDebugEnabled()) { LOG.debug("Uncaching of {} completed. usedBytes = {}", key, newUsedBytes);
LOG.debug("Uncaching of " + key + " completed. " +
"usedBytes = " + newUsedBytes);
}
} }
} }

View File

@ -43,8 +43,6 @@ import java.util.TreeMap;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries; import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
@ -85,6 +83,8 @@ import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.util.GSet; import org.apache.hadoop.util.GSet;
import org.apache.hadoop.util.LightWeightGSet; import org.apache.hadoop.util.LightWeightGSet;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
@ -99,7 +99,7 @@ import com.google.common.collect.Lists;
*/ */
@InterfaceAudience.LimitedPrivate({"HDFS"}) @InterfaceAudience.LimitedPrivate({"HDFS"})
public final class CacheManager { public final class CacheManager {
public static final Log LOG = LogFactory.getLog(CacheManager.class); public static final Logger LOG = LoggerFactory.getLogger(CacheManager.class);
private static final float MIN_CACHED_BLOCKS_PERCENT = 0.001f; private static final float MIN_CACHED_BLOCKS_PERCENT = 0.001f;
@ -205,8 +205,8 @@ public final class CacheManager {
DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT, DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT,
DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT_DEFAULT); DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT_DEFAULT);
if (cachedBlocksPercent < MIN_CACHED_BLOCKS_PERCENT) { if (cachedBlocksPercent < MIN_CACHED_BLOCKS_PERCENT) {
LOG.info("Using minimum value " + MIN_CACHED_BLOCKS_PERCENT + LOG.info("Using minimum value {} for {}", MIN_CACHED_BLOCKS_PERCENT,
" for " + DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT); DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT);
cachedBlocksPercent = MIN_CACHED_BLOCKS_PERCENT; cachedBlocksPercent = MIN_CACHED_BLOCKS_PERCENT;
} }
this.cachedBlocks = new LightWeightGSet<CachedBlock, CachedBlock>( this.cachedBlocks = new LightWeightGSet<CachedBlock, CachedBlock>(
@ -346,10 +346,8 @@ public final class CacheManager {
*/ */
private static long validateExpiryTime(CacheDirectiveInfo info, private static long validateExpiryTime(CacheDirectiveInfo info,
long maxRelativeExpiryTime) throws InvalidRequestException { long maxRelativeExpiryTime) throws InvalidRequestException {
if (LOG.isTraceEnabled()) { LOG.trace("Validating directive {} pool maxRelativeExpiryTime {}", info,
LOG.trace("Validating directive " + info maxRelativeExpiryTime);
+ " pool maxRelativeExpiryTime " + maxRelativeExpiryTime);
}
final long now = new Date().getTime(); final long now = new Date().getTime();
final long maxAbsoluteExpiryTime = now + maxRelativeExpiryTime; final long maxAbsoluteExpiryTime = now + maxRelativeExpiryTime;
if (info == null || info.getExpiration() == null) { if (info == null || info.getExpiration() == null) {
@ -539,7 +537,7 @@ public final class CacheManager {
LOG.warn("addDirective of " + info + " failed: ", e); LOG.warn("addDirective of " + info + " failed: ", e);
throw e; throw e;
} }
LOG.info("addDirective of " + info + " successful."); LOG.info("addDirective of {} successful.", info);
return directive.toInfo(); return directive.toInfo();
} }
@ -641,8 +639,7 @@ public final class CacheManager {
LOG.warn("modifyDirective of " + idString + " failed: ", e); LOG.warn("modifyDirective of " + idString + " failed: ", e);
throw e; throw e;
} }
LOG.info("modifyDirective of " + idString + " successfully applied " + LOG.info("modifyDirective of {} successfully applied {}.", idString, info);
info+ ".");
} }
private void removeInternal(CacheDirective directive) private void removeInternal(CacheDirective directive)
@ -779,7 +776,7 @@ public final class CacheManager {
LOG.info("addCachePool of " + info + " failed: ", e); LOG.info("addCachePool of " + info + " failed: ", e);
throw e; throw e;
} }
LOG.info("addCachePool of " + info + " successful."); LOG.info("addCachePool of {} successful.", info);
return pool.getInfo(true); return pool.getInfo(true);
} }
@ -842,8 +839,8 @@ public final class CacheManager {
LOG.info("modifyCachePool of " + info + " failed: ", e); LOG.info("modifyCachePool of " + info + " failed: ", e);
throw e; throw e;
} }
LOG.info("modifyCachePool of " + info.getPoolName() + " successful; " LOG.info("modifyCachePool of {} successful; {}", info.getPoolName(),
+ bld.toString()); bld.toString());
} }
/** /**
@ -935,11 +932,9 @@ public final class CacheManager {
if (metrics != null) { if (metrics != null) {
metrics.addCacheBlockReport((int) (endTime - startTime)); metrics.addCacheBlockReport((int) (endTime - startTime));
} }
if (LOG.isDebugEnabled()) { LOG.debug("Processed cache report from {}, blocks: {}, " +
LOG.debug("Processed cache report from " "processing time: {} msecs", datanodeID, blockIds.size(),
+ datanodeID + ", blocks: " + blockIds.size() (endTime - startTime));
+ ", processing time: " + (endTime - startTime) + " msecs");
}
} }
private void processCacheReportImpl(final DatanodeDescriptor datanode, private void processCacheReportImpl(final DatanodeDescriptor datanode,
@ -950,6 +945,8 @@ public final class CacheManager {
CachedBlocksList pendingCachedList = datanode.getPendingCached(); CachedBlocksList pendingCachedList = datanode.getPendingCached();
for (Iterator<Long> iter = blockIds.iterator(); iter.hasNext(); ) { for (Iterator<Long> iter = blockIds.iterator(); iter.hasNext(); ) {
long blockId = iter.next(); long blockId = iter.next();
LOG.trace("Cache report from datanode {} has block {}", datanode,
blockId);
CachedBlock cachedBlock = CachedBlock cachedBlock =
new CachedBlock(blockId, (short)0, false); new CachedBlock(blockId, (short)0, false);
CachedBlock prevCachedBlock = cachedBlocks.get(cachedBlock); CachedBlock prevCachedBlock = cachedBlocks.get(cachedBlock);
@ -959,15 +956,18 @@ public final class CacheManager {
cachedBlock = prevCachedBlock; cachedBlock = prevCachedBlock;
} else { } else {
cachedBlocks.put(cachedBlock); cachedBlocks.put(cachedBlock);
LOG.trace("Added block {} to cachedBlocks", cachedBlock);
} }
// Add the block to the datanode's implicit cached block list // Add the block to the datanode's implicit cached block list
// if it's not already there. Similarly, remove it from the pending // if it's not already there. Similarly, remove it from the pending
// cached block list if it exists there. // cached block list if it exists there.
if (!cachedBlock.isPresent(cachedList)) { if (!cachedBlock.isPresent(cachedList)) {
cachedList.add(cachedBlock); cachedList.add(cachedBlock);
LOG.trace("Added block {} to CACHED list.", cachedBlock);
} }
if (cachedBlock.isPresent(pendingCachedList)) { if (cachedBlock.isPresent(pendingCachedList)) {
pendingCachedList.remove(cachedBlock); pendingCachedList.remove(cachedBlock);
LOG.trace("Removed block {} from PENDING_CACHED list.", cachedBlock);
} }
} }
} }

View File

@ -34,6 +34,7 @@ import static org.junit.Assert.fail;
import java.io.IOException; import java.io.IOException;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date; import java.util.Date;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.Iterator; import java.util.Iterator;
@ -682,6 +683,12 @@ public class TestCacheDirectives {
} finally { } finally {
namesystem.readUnlock(); namesystem.readUnlock();
} }
LOG.info(logString + " cached blocks: have " + numCachedBlocks +
" / " + expectedCachedBlocks + ". " +
"cached replicas: have " + numCachedReplicas +
" / " + expectedCachedReplicas);
if (expectedCachedBlocks == -1 || if (expectedCachedBlocks == -1 ||
numCachedBlocks == expectedCachedBlocks) { numCachedBlocks == expectedCachedBlocks) {
if (expectedCachedReplicas == -1 || if (expectedCachedReplicas == -1 ||
@ -689,10 +696,6 @@ public class TestCacheDirectives {
return true; return true;
} }
} }
LOG.info(logString + " cached blocks: have " + numCachedBlocks +
" / " + expectedCachedBlocks + ". " +
"cached replicas: have " + numCachedReplicas +
" / " + expectedCachedReplicas);
return false; return false;
} }
}, 500, 60000); }, 500, 60000);
@ -1415,7 +1418,10 @@ public class TestCacheDirectives {
for (DataNode dn : cluster.getDataNodes()) { for (DataNode dn : cluster.getDataNodes()) {
DatanodeDescriptor descriptor = DatanodeDescriptor descriptor =
datanodeManager.getDatanode(dn.getDatanodeId()); datanodeManager.getDatanode(dn.getDatanodeId());
Assert.assertTrue(descriptor.getPendingCached().isEmpty()); Assert.assertTrue("Pending cached list of " + descriptor +
" is not empty, "
+ Arrays.toString(descriptor.getPendingCached().toArray()),
descriptor.getPendingCached().isEmpty());
} }
} finally { } finally {
cluster.getNamesystem().readUnlock(); cluster.getNamesystem().readUnlock();
@ -1430,10 +1436,6 @@ public class TestCacheDirectives {
int numCachedReplicas = (int) ((CACHE_CAPACITY*NUM_DATANODES)/BLOCK_SIZE); int numCachedReplicas = (int) ((CACHE_CAPACITY*NUM_DATANODES)/BLOCK_SIZE);
DFSTestUtil.createFile(dfs, fileName, fileLen, (short) NUM_DATANODES, DFSTestUtil.createFile(dfs, fileName, fileLen, (short) NUM_DATANODES,
0xFADED); 0xFADED);
// Set up a log appender watcher
final LogVerificationAppender appender = new LogVerificationAppender();
final Logger logger = Logger.getRootLogger();
logger.addAppender(appender);
dfs.addCachePool(new CachePoolInfo("pool")); dfs.addCachePool(new CachePoolInfo("pool"));
dfs.addCacheDirective(new CacheDirectiveInfo.Builder().setPool("pool") dfs.addCacheDirective(new CacheDirectiveInfo.Builder().setPool("pool")
.setPath(fileName).setReplication((short) 1).build()); .setPath(fileName).setReplication((short) 1).build());